hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077033 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/streaming/src/test/org/apache/hadoop/streaming/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ mapred/org/apache/hadoop/mapreduce/li...
Date Fri, 04 Mar 2011 03:34:20 GMT
Author: omalley
Date: Fri Mar  4 03:34:19 2011
New Revision: 1077033

URL: http://svn.apache.org/viewvc?rev=1077033&view=rev
Log:
commit 3b1d40f45ba4e129936af01e9dc608d08ed8e3e2
Author: Hemanth Yamijala <yhemanth@apache.org>
Date:   Thu Oct 22 20:31:26 2009 +0530

    MAPREDUCE:947 from https://issues.apache.org/jira/secure/attachment/12422899/mr-947-y20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-947. Added commitJob and abortJob apis to OutputCommitter.
    +    Enhanced FileOutputCommitter to create a _SUCCESS file for successful
    +    jobs. (Amar Kamat & Jothi Padmanabhan via acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Utils.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBadRecords.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestEmptyJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobName.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Fri Mar  4 03:34:19 2011
@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.streaming;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
 import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Utils;
 /**
  * This test case tests the symlink creation
  * utility provided by distributed caching 
@@ -113,7 +121,8 @@ public class TestMultipleCachefiles exte
         String line2 = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                      new Path(OUTPUT_DIR),
-                                     new OutputLogFilter()));
+                                     new Utils.OutputFileUtils
+                                       .OutputFilesFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java Fri Mar  4 03:34:19 2011
@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -74,7 +75,7 @@ public class TestStreamAggregate extends
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -94,10 +95,12 @@ public class TestStreamAggregate extends
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java Fri Mar  4 03:34:19 2011
@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 
@@ -82,7 +83,7 @@ public class TestStreamDataProtocol exte
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -103,10 +104,12 @@ public class TestStreamDataProtocol exte
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java Fri Mar  4 03:34:19 2011
@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -75,7 +76,7 @@ public class TestStreamReduceNone extend
     File outFile = null;
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -94,11 +95,12 @@ public class TestStreamReduceNone extend
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java Fri Mar  4 03:34:19 2011
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests StreamXmlRecordReader
  * The test creates an XML file, uses StreamXmlRecordReader and compares
@@ -61,7 +63,7 @@ public class TestStreamXmlRecordReader e
   public void testCommandLine() {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
       createInput();
@@ -74,10 +76,12 @@ public class TestStreamXmlRecordReader e
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      INPUT_FILE.delete();
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        e.printStackTrace();
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Fri Mar  4 03:34:19 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
  */
@@ -73,7 +75,7 @@ public class TestStreaming extends TestC
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -91,10 +93,8 @@ public class TestStreaming extends TestC
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java Fri Mar  4 03:34:19 2011
@@ -37,9 +37,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.mapred.Utils;
 
 public class TestStreamingBadRecords extends ClusterMapReduceTestCase
 {
@@ -118,7 +118,7 @@ public class TestStreamingBadRecords ext
     badRecs.addAll(REDUCER_BAD_RECORDS);
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
     
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java Fri Mar  4 03:34:19 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode by giving
  * empty input to mapper and the mapper generates nonempty output. Since map()
@@ -77,7 +79,7 @@ public class TestStreamingEmptyInpNonemp
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -100,12 +102,14 @@ public class TestStreamingEmptyInpNonemp
       outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
       outFile.delete();
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      SCRIPT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
-    }
+      try {
+        INPUT_FILE.delete();
+        SCRIPT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        e.printStackTrace();
+      }
+   }
   }
 
   public static void main(String[]args) throws Exception

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java Fri Mar  4 03:34:19 2011
@@ -75,7 +75,11 @@ public class TestStreamingFailure extend
     } catch(Exception e) {
       // Expecting an exception
     } finally {
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java Fri Mar  4 03:34:19 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
  * This testcase looks at different cases of tab position in input. 
@@ -78,7 +80,7 @@ public class TestStreamingKeyValue exten
     File outFile = null;
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -96,13 +98,13 @@ public class TestStreamingKeyValue exten
       assertEquals(outputExpect, output);
     } catch(Exception e) {
       failTrace(e);
-    } finally {
-      outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR,
-                          "." + outFileName + ".crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+    } finally { 
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java Fri Mar  4 03:34:19 2011
@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -89,7 +90,7 @@ public class TestStreamingSeparator exte
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -109,10 +110,12 @@ public class TestStreamingSeparator exte
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Fri Mar  4 03:34:19 2011
@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.streaming;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
 import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Utils;
 /**
  * This test case tests the symlink creation
  * utility provided by distributed caching 
@@ -88,7 +96,8 @@ public class TestSymLink extends TestCas
           "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
         };
 
-        fileSys.delete(new Path(OUTPUT_DIR));
+        fileSys.delete(new Path(OUTPUT_DIR), true);
+      
         
         DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
         file.writeBytes(mapString);
@@ -104,7 +113,8 @@ public class TestSymLink extends TestCas
         String line = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                                 new Path(OUTPUT_DIR),
-                                                new OutputLogFilter()));
+                                                new Utils.OutputFileUtils
+                                                .OutputFilesFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Fri Mar  4 03:34:19 2011
@@ -39,6 +39,9 @@ public class FileOutputCommitter extends
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
 
   public void setupJob(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
@@ -52,7 +55,35 @@ public class FileOutputCommitter extends
     }
   }
 
-  public void cleanupJob(JobContext context) throws IOException {
+  private static boolean getOutputDirMarking(JobConf conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                           true);
+  }
+
+  // Mark the output dir of the job for which the context is passed.
+  private void markSuccessfulOutputDir(JobContext context) 
+  throws IOException {
+    JobConf conf = context.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      FileSystem fileSys = outputPath.getFileSystem(conf);
+      // create a file in the folder to mark it
+      if (fileSys.exists(outputPath)) {
+        Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+        fileSys.create(filePath).close();
+      }
+    }
+  }
+  
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    cleanup(context);
+    if (getOutputDirMarking(context.getJobConf())) {
+      markSuccessfulOutputDir(context);
+    }
+  }
+  
+  private void cleanup(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
     // do the clean up of temporary directory
     Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -63,9 +94,22 @@ public class FileOutputCommitter extends
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
       }
+    } else {
+      LOG.warn("Output path is null in cleanup");
     }
   }
 
+  /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   * @param runState final run state of the job, should be
+   * {@link JobStatus#KILLED} or {@link JobStatus#FAILED}
+   */
+  @Override
+  public void abortJob(JobContext context, int runState) throws IOException {
+    cleanup(context);
+  }
+  
   public void setupTask(TaskAttemptContext context) throws IOException {
     // FileOutputCommitter's setupTask doesn't do anything. Because the
     // temporary task directory is created on demand when the 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:34:19 2011
@@ -1215,6 +1215,20 @@ class JobInProgress {
       if (result != null) {
         addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
       }
+
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+        if (jobFailed) {
+          result.setJobCleanupTaskState
+          (org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+        } else if (jobKilled) {
+          result.setJobCleanupTaskState
+          (org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+        } else {
+          result.setJobCleanupTaskState
+          (org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+        }
+      }
       return result;
     }
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Mar  4 03:34:19 2011
@@ -322,4 +322,10 @@ public class JobStatus implements Writab
     this.priority = WritableUtils.readEnum(in, JobPriority.class);
     this.schedulingInfo = Text.readString(in);
   }
+
+  // A utility to convert new job runstates to the old ones.
+  static int getOldNewJobRunState(
+    org.apache.hadoop.mapreduce.JobStatus.State state) {
+    return state.getValue();
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:34:19 2011
@@ -231,7 +231,7 @@ class LocalJobRunner implements JobSubmi
           }
         }
         // delete the temporary directory in output directory
-        outputCommitter.cleanupJob(jContext);
+        outputCommitter.commitJob(jContext);
         status.setCleanupProgress(1.0f);
 
         if (killed) {
@@ -244,7 +244,7 @@ class LocalJobRunner implements JobSubmi
 
       } catch (Throwable t) {
         try {
-          outputCommitter.cleanupJob(jContext);
+          outputCommitter.abortJob(jContext, JobStatus.FAILED);
         } catch (IOException ioe) {
           LOG.info("Error cleaning up job:" + id);
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java Fri Mar  4 03:34:19 2011
@@ -68,13 +68,37 @@ public abstract class OutputCommitter 
 
   /**
    * For cleaning up the job's output after job completion
+   * @deprecated use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, int)} instead
+   */
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
+
+  /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final run state as {@link JobStatus#SUCCEEDED}.
    * 
    * @param jobContext Context of the job whose output is being written.
-   * @throws IOException
+   * @throws IOException 
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException { 
+    cleanupJob(jobContext);
+  }
 
   /**
+   * For cleaning up the job's output after job failure.
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @param status Final run state of the job, should be 
+   * {@link JobStatus#KILLED} or {@link JobStatus#FAILED}
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, int status) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
    * Sets up output for the task.
    * 
    * @param taskContext Context of the task whose output is being written.
@@ -130,6 +154,7 @@ public abstract class OutputCommitter 
    * is a bridge between the two.
    */
   @Override
+  @Deprecated
   public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
                                ) throws IOException {
     cleanupJob((JobContext) context);
@@ -141,6 +166,33 @@ public abstract class OutputCommitter 
    * is a bridge between the two.
    */
   @Override
+  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
+                               ) throws IOException {
+    commitJob((JobContext) context);
+  }
+
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
+                          org.apache.hadoop.mapreduce.JobStatus.State runState) 
+  throws IOException {
+    int state = JobStatus.getOldNewJobRunState(runState);
+    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+      throw new IOException ("Invalid job run state : " + runState.name());
+    }
+    abortJob((JobContext) context, state);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
   public final 
   void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
                  ) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java Fri Mar  4 03:34:19 2011
@@ -27,9 +27,15 @@ import org.apache.hadoop.fs.PathFilter;
  * This can be used to list paths of output directory as follows:
  *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  *                                   new OutputLogFilter()));
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter} 
+ *   instead.
  */
 public class OutputLogFilter implements PathFilter {
+  private static final PathFilter LOG_FILTER = 
+    new Utils.OutputFileUtils.OutputLogFilter();
+     
   public boolean accept(Path path) {
-    return !(path.toString().contains("_logs"));
+    return LOG_FILTER.accept(path);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:34:19 2011
@@ -42,9 +42,11 @@ import org.apache.hadoop.io.DataInputBuf
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -117,6 +119,7 @@ abstract public class Task implements Wr
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
+  protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
   protected boolean jobSetup = false;
   protected boolean taskCleanup = false;
@@ -309,6 +312,14 @@ abstract public class Task implements Wr
     return jobCleanup;
   }
 
+  boolean isJobAbortTask() {
+    // the task is an abort task if its marked for cleanup and the final 
+    // expected state is either failed or killed.
+    return isJobCleanupTask() 
+           && (jobRunStateForCleanup == JobStatus.State.KILLED 
+               || jobRunStateForCleanup == JobStatus.State.FAILED);
+  }
+  
   boolean isJobSetupTask() {
     return jobSetup;
   }
@@ -321,6 +332,14 @@ abstract public class Task implements Wr
     jobCleanup = true; 
   }
 
+  /**
+   * Sets the task to do job abort in the cleanup.
+   * @param status the final runstate of the job 
+   */
+  void setJobCleanupTaskState(JobStatus.State status) {
+    jobRunStateForCleanup = status;
+  }
+  
   boolean isMapOrReduce() {
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
@@ -341,6 +360,9 @@ abstract public class Task implements Wr
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(jobCleanup);
+    if (jobCleanup) {
+      WritableUtils.writeEnum(out, jobRunStateForCleanup);
+    }
     out.writeBoolean(jobSetup);
     Text.writeString(out, username);
     out.writeBoolean(writeSkipRecs);
@@ -359,6 +381,10 @@ abstract public class Task implements Wr
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     jobCleanup = in.readBoolean();
+    if (jobCleanup) {
+      jobRunStateForCleanup = 
+        WritableUtils.readEnum(in, JobStatus.State.class);
+    }
     jobSetup = in.readBoolean();
     username = Text.readString(in);
     writeSkipRecs = in.readBoolean();
@@ -832,7 +858,21 @@ abstract public class Task implements Wr
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
     // do the cleanup
-    committer.cleanupJob(jobContext);
+    LOG.info("Cleaning up job");
+    if (jobRunStateForCleanup == JobStatus.State.FAILED 
+        || jobRunStateForCleanup == JobStatus.State.KILLED) {
+      LOG.info("Aborting job with runstate : " + jobRunStateForCleanup);
+          committer.abortJob(jobContext, jobRunStateForCleanup);
+    } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+      LOG.info("Committing job");
+      committer.commitJob(jobContext);
+    } else {
+      throw new IOException("Invalid state of the job for cleanup. State found "
+                            + jobRunStateForCleanup + " expecting "
+                            + JobStatus.State.SUCCEEDED + ", " 
+                            + JobStatus.State.FAILED + " or "
+                            + JobStatus.State.KILLED);
+    }
     done(umbilical, reporter);
   }
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Utils.java?rev=1077033&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Utils.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Utils.java Fri Mar  4 03:34:19 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * A utility class. It provides
+ *   - file-util
+ *     - A path filter utility to filter out output/part files in the output dir
+ */
+public class Utils {
+  public static class OutputFileUtils {
+    /**
+     * This class filters output(part) files from the given directory
+     * It does not accept files with filenames _logs and _SUCCESS.
+     * This can be used to list paths of output directory as follows:
+     *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     *                                         new OutputFilesFilter()));
+     */
+    public static class OutputFilesFilter extends OutputLogFilter {
+      public boolean accept(Path path) {
+        return super.accept(path) 
+               && !FileOutputCommitter.SUCCEEDED_FILE_NAME
+                   .equals(path.getName());
+      }
+    }
+    
+    /**
+     * This class filters log files from directory given
+     * It doesnt accept paths having _logs.
+     * This can be used to list paths of output directory as follows:
+     *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     *                                   new OutputLogFilter()));
+     */
+    public static class OutputLogFilter implements PathFilter {
+      public boolean accept(Path path) {
+        return !(path.toString().contains("_logs"));
+      }
+    }
+  }
+}
+

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobStatus.java?rev=1077033&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobStatus.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobStatus.java Fri Mar  4 03:34:19 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Describes the current status of a job.
+ */
+public class JobStatus {
+  /**
+   * Current state of the job 
+   */
+  public static enum State {
+    RUNNING(1),
+    SUCCEEDED(2),
+    FAILED(3),
+    PREP(4),
+    KILLED(5);
+
+    int value;
+
+    State(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return value;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java Fri Mar  4 03:34:19 2011
@@ -65,14 +65,41 @@ public abstract class OutputCommitter {
   public abstract void setupJob(JobContext jobContext) throws IOException;
 
   /**
-   * For cleaning up the job's output after job completion
+   * For cleaning up the job's output after job completion. Note that this
+   * is invoked for jobs with final run state as 
+   * {@link JobStatus.State#SUCCEEDED}
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
+
+  /**
+   * For cleaning up the job's output after job completion
+   * @deprecated use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, JobStatus.State)} instead
+   */
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException { }
 
   /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final run state as {@link JobStatus.State#FAILED} or 
+   * {@link JobStatus.State#KILLED}.
+ 
+   * @param jobContext Context of the job whose output is being written.
+   * @param state final run state of the job, should be either 
+   * {@link JobStatus.State#KILLED} or {@link JobStatus.State#FAILED} 
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, JobStatus.State state) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
    * Sets up output for the task.
    * 
    * @param taskContext Context of the task whose output is being written.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Fri Mar  4 03:34:19 2011
@@ -23,10 +23,12 @@ import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -43,6 +45,9 @@ public class FileOutputCommitter extends
    * Temporary directory name 
    */
   protected static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
   private FileSystem outputFileSystem = null;
   private Path outputPath = null;
   private Path workPath = null;
@@ -80,21 +85,64 @@ public class FileOutputCommitter extends
     }
   }
 
+  private static boolean shouldMarkOutputDir(Configuration conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                           true);
+  }
+
+  // Mark the output dir of the job for which the context is passed.
+  private void markOutputDirSuccessful(JobContext context)
+  throws IOException {
+    if (outputPath != null) {
+      FileSystem fileSys = outputPath.getFileSystem(context.getConfiguration());
+      if (fileSys.exists(outputPath)) {
+        // create a file in the folder to mark it
+        Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+        fileSys.create(filePath).close();
+      }
+    }
+  }
+
   /**
    * Delete the temporary directory, including all of the work directories.
-   * @param context the job's context
+   * This is called for all jobs whose final run state is SUCCEEDED
+   * @param context the job's context.
    */
-  public void cleanupJob(JobContext context) throws IOException {
+  public void commitJob(JobContext context) throws IOException {
+    // delete the _temporary folder
+    cleanup(context);
+    // check if the o/p dir should be marked
+    if (shouldMarkOutputDir(context.getConfiguration())) {
+      // create a _success file in the o/p folder
+      markOutputDirSuccessful(context);
+    }
+  }
+
+  private void cleanup(JobContext context) 
+  throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
       }
+    } else {
+      LOG.warn("Output path is null in cleanup");
     }
   }
 
   /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   * @param state final run state of the job, should be FAILED or KILLED
+   */
+  @Override
+  public void abortJob(JobContext context, JobStatus.State state)
+  throws IOException {
+    cleanup(context);
+  }
+  
+  /**
    * No task setup required.
    */
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Fri Mar  4 03:34:19 2011
@@ -46,6 +46,7 @@ public class NullOutputFormat<K, V> exte
     return new OutputCommitter() {
       public void abortTask(TaskAttemptContext taskContext) { }
       public void cleanupJob(JobContext jobContext) { }
+      public void commitJob(JobContext jobContext) { }
       public void commitTask(TaskAttemptContext taskContext) { }
       public boolean needsTaskCommit(TaskAttemptContext taskContext) {
         return false;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml Fri Mar  4 03:34:19 2011
@@ -28,6 +28,16 @@
        <Field name="out" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+       <Or>
+       <Method name="abortJob" />
+       <Method name="commitJob" />
+       <Method name="cleanupJob" />
+       </Or>
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
+
      <!--
        TFile
      -->
@@ -55,5 +65,15 @@
        <Method name="reportFatalError" />
        <Bug pattern="DM_EXIT" />
      </Match>
-    
+    <!--
+       We need to cast objects between old and new api objects
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+       <Bug pattern="BC_UNCONFIRMED_CAST" />
+     </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
 </FindBugsFilter>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml Fri Mar  4 03:34:19 2011
@@ -14,5 +14,8 @@
   <value>hosts.exclude</value>
   <description></description>
 </property>
-
+<property>
+  <name>mapreduce.fileoutputcommitter.marksuccessfuljobs</name>
+  <value>false</value>
+</property>
 </configuration>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Fri Mar  4 03:34:19 2011
@@ -82,8 +82,8 @@ public class TestNoDefaultsJobConf exten
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-                           getFileSystem().listStatus(outDir,
-                           new OutputLogFilter()));
+                   getFileSystem().listStatus(outDir,
+                   new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBadRecords.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBadRecords.java Fri Mar  4 03:34:19 2011
@@ -165,7 +165,7 @@ public class TestBadRecords extends Clus
     
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     
     List<String> mapperOutput=getProcessed(input, mapperBadRecords);
     LOG.debug("mapperOutput " + mapperOutput.size());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Fri Mar  4 03:34:19 2011
@@ -63,8 +63,8 @@ public class TestClusterMapReduceTestCas
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-                           getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                    getFileSystem().listStatus(getOutputDir(),
+                    new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestEmptyJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestEmptyJob.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestEmptyJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestEmptyJob.java Fri Mar  4 03:34:19 2011
@@ -52,7 +52,7 @@ public class TestEmptyJob extends TestCa
    */
   static class CommitterWithDelayCleanup extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       Configuration conf = context.getConfiguration();
       Path share = new Path(conf.get("share"));
       FileSystem fs = FileSystem.get(conf);
@@ -64,7 +64,7 @@ public class TestEmptyJob extends TestCa
         }
         UtilsForTests.waitFor(100);
       }
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -195,7 +195,8 @@ public class TestEmptyJob extends TestCa
         + " and not 1.0", runningJob.cleanupProgress() == 1.0);
 
     assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
-    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    FileStatus[] list = fs.listStatus(outDir, 
+        new Utils.OutputFileUtils.OutputFilesFilter());
     assertTrue("Number of part-files is " + list.length + " and not "
         + numReduces, list.length == numReduces);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Mar  4 03:34:19 2011
@@ -73,7 +73,7 @@ public class TestFileOutputCommitter ext
       theRecordWriter.close(reporter);
     }
     committer.commitTask(tContext);
-    committer.cleanupJob(jContext);
+    committer.commitJob(jContext);
     
     File expectedFile = new File(new Path(outDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java Fri Mar  4 03:34:19 2011
@@ -96,7 +96,7 @@ public class TestJavaSerialization exten
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -144,7 +144,7 @@ public class TestJavaSerialization exten
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
 }
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java?rev=1077033&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java Fri Mar  4 03:34:19 2011
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * A JUnit test to test Map-Reduce job cleanup.
+ */
+public class TestJobCleanup extends TestCase {
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp") + "/" 
+               + "test-job-cleanup").toString();
+  private static final String ABORT_KILLED_FILE_NAME = 
+    "_custom_abort_killed";
+  private static final String ABORT_FAILED_FILE_NAME = 
+    "_custom_abort_failed";
+  private static FileSystem fileSys = null;
+  private static MiniMRCluster mr = null;
+  private static Path inDir = null;
+  private static Path emptyInDir = null;
+  private static int outDirs = 0;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        fileSys = FileSystem.get(conf);
+        fileSys.delete(new Path(TEST_ROOT_DIR), true);
+        conf.set("mapred.job.tracker.handler.count", "1");
+        conf.set("mapred.job.tracker", "127.0.0.1:0");
+        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+        conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+
+        mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+        inDir = new Path(TEST_ROOT_DIR, "test-input");
+        String input = "The quick brown fox\n" + "has many silly\n"
+                       + "red fox sox\n";
+        DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
+        file.writeBytes(input);
+        file.close();
+        emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
+        fileSys.mkdirs(emptyInDir);
+      }
+      
+      protected void tearDown() throws Exception {
+        if (fileSys != null) {
+          fileSys.close();
+        }
+        if (mr != null) {
+          mr.shutdown();
+        }
+      }
+    };
+    return setup;
+  }
+  
+  /** 
+   * Committer with abort making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomAbort extends FileOutputCommitter {
+    @Override
+    public void abortJob(JobContext context, int state) 
+    throws IOException {
+      JobConf conf = context.getJobConf();
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      String fileName = (state == JobStatus.FAILED) 
+                        ? TestJobCleanup.ABORT_FAILED_FILE_NAME 
+                        : TestJobCleanup.ABORT_KILLED_FILE_NAME;
+      fs.create(new Path(outputPath, fileName)).close();
+    }
+  }
+  
+  private Path getNewOutputDir() {
+    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+  }
+  
+  private void configureJob(JobConf jc, String jobName, int maps, int reds, 
+                            Path outDir) {
+    jc.setJobName(jobName);
+    jc.setInputFormat(TextInputFormat.class);
+    jc.setOutputKeyClass(LongWritable.class);
+    jc.setOutputValueClass(Text.class);
+    FileInputFormat.setInputPaths(jc, inDir);
+    FileOutputFormat.setOutputPath(jc, outDir);
+    jc.setMapperClass(IdentityMapper.class);
+    jc.setReducerClass(IdentityReducer.class);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reds);
+    jc.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true);
+  }
+  
+  // run a job with 1 map and let it run to completion
+  private void testSuccessfulJob(String filename, 
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "job with cleanup()", 1, 0, outDir);
+    jc.setOutputCommitter(committer);
+    
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    job.waitForCompletion();
+    
+    Path testFile = new Path(outDir, filename);
+    assertTrue("Done file missing for job " + id, fileSys.exists(testFile));
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for successful job " 
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  // run a job for which all the attempts simply fail.
+  private void testFailedJob(String fileName, 
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "fail job with abort()", 1, 0, outDir);
+    jc.setMaxMapAttempts(1);
+    // set the job to fail
+    jc.setMapperClass(UtilsForTests.FailMapper.class);
+    jc.setOutputCommitter(committer);
+
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    job.waitForCompletion();
+    
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for failed job " + id, 
+                 fileSys.exists(testFile));
+    }
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for failed job "
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  // run a job which gets stuck in mapper and kill it.
+  private void testKilledJob(String fileName,
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "kill job with abort()", 1, 0, outDir);
+    // set the job to wait for long
+    jc.setMapperClass(UtilsForTests.KillMapper.class);
+    jc.setOutputCommitter(committer);
+    
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    JobInProgress jip = 
+      mr.getJobTrackerRunner().getJobTracker().getJob(job.getID());
+    
+    // wait for the map to be launched
+    while (true) {
+      if (jip.runningMaps() == 1) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+    }
+    
+    job.killJob(); // kill the job
+    
+    job.waitForCompletion(); // wait for the job to complete
+    
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for job " + id, 
+                 fileSys.exists(testFile));
+    }
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for killed job "
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  /**
+   * Test default cleanup/abort behavior
+   * 
+   * @throws IOException
+   */
+  public void testDefaultCleanupAndAbort() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      FileOutputCommitter.class,
+                      new String[] {});
+    
+    // check with a failed job
+    testFailedJob(null, 
+                  FileOutputCommitter.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+    
+    // check default abort job kill
+    testKilledJob(null, 
+                  FileOutputCommitter.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
+  
+  /**
+   * Test if a failed job with custom committer runs the abort code.
+   * 
+   * @throws IOException
+   */
+  public void testCustomAbort() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                      CommitterWithCustomAbort.class,
+                      new String[] {ABORT_FAILED_FILE_NAME, 
+                                    ABORT_KILLED_FILE_NAME});
+    
+    // check with a failed job
+    testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                                ABORT_KILLED_FILE_NAME});
+    
+    // check with a killed job
+    testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                                ABORT_FAILED_FILE_NAME});
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobName.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobName.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobName.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobName.java Fri Mar  4 03:34:19 2011
@@ -61,7 +61,8 @@ public class TestJobName extends Cluster
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
+
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -95,7 +96,8 @@ public class TestJobName extends Cluster
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
+
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Fri Mar  4 03:34:19 2011
@@ -77,7 +77,7 @@ public class TestMiniMRClasspath extends
     {
       Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-              new OutputLogFilter()));
+          new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
@@ -132,7 +132,8 @@ public class TestMiniMRClasspath extends
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                 new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
+
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
                                                                      fs.open(fileList[i])));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar  4 03:34:19 2011
@@ -101,7 +101,8 @@ public class TestMiniMRWithDFS extends T
     {
       
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                   new OutputLogFilter()));
+          new Utils.OutputFileUtils.OutputFilesFilter()));
+
       for(int i=0; i < fileList.length; ++i) {
         LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
         BufferedReader file = 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri Mar  4 03:34:19 2011
@@ -49,10 +49,10 @@ public class TestSetupAndCleanupFailure 
     }
   }
 
-  // Commiter with cleanupJob throwing exception
+  // Commiter with commitJob throwing exception
   static class CommitterWithFailCleanup extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       throw new IOException();
     }
   }
@@ -78,9 +78,9 @@ public class TestSetupAndCleanupFailure 
     }
     
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java Fri Mar  4 03:34:19 2011
@@ -80,7 +80,7 @@ public class TestUserDefinedCounters ext
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java Fri Mar  4 03:34:19 2011
@@ -22,10 +22,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
+import junit.extensions.TestSetup;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
-import junit.extensions.TestSetup;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -317,7 +318,8 @@ public class TestDatamerge extends TestC
     job.setOutputFormat(SequenceFileOutputFormat.class);
     JobClient.runJob(job);
 
-    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, 
+        new Utils.OutputFileUtils.OutputFilesFilter());
     assertEquals(1, outlist.length);
     assertTrue(0 < outlist[0].getLen());
     SequenceFile.Reader r =

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Fri Mar  4 03:34:19 2011
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -30,10 +34,10 @@ import org.apache.hadoop.mapred.FileOutp
 import org.apache.hadoop.mapred.HadoopTestCase;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 
 
 public class TestKeyFieldBasedComparator extends HadoopTestCase {
@@ -94,7 +98,7 @@ public class TestKeyFieldBasedComparator
     }
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(outDir,
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=1077033&r1=1077032&r2=1077033&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Fri Mar  4 03:34:19 2011
@@ -20,30 +20,30 @@ package org.apache.hadoop.mapred.pipes;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
-import junit.framework.TestCase;
-
 public class TestPipes extends TestCase {
   private static final Log LOG =
     LogFactory.getLog(TestPipes.class.getName());
@@ -180,7 +180,7 @@ public class TestPipes extends TestCase 
 
     List<String> results = new ArrayList<String>();
     for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
-    		                        new OutputLogFilter()))) {
+        new Utils.OutputFileUtils.OutputFilesFilter()))) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 



Mime
View raw message