hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r827947 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/examples/org/apache/hadoop/examples/terasort/ src/java/org/apache/hadoop/mapr...
Date Wed, 21 Oct 2009 10:00:19 GMT
Author: acmurthy
Date: Wed Oct 21 10:00:17 2009
New Revision: 827947

URL: http://svn.apache.org/viewvc?rev=827947&view=rev
Log:
MAPREDUCE-947. Added commitJob and abortJob apis to OutputCommitter. Enhanced FileOutputCommitter to create a _SUCCESS file for successful jobs. Contributed by Amar Kamat & Jothi Padmanabhan.  

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Utils.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputCommitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestValueIterReset.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Oct 21 10:00:17 2009
@@ -13,6 +13,10 @@
 
     MAPREDUCE-906. Update Sqoop documentation. (Aaron Kimball via cdouglas)
 
+    MAPREDUCE-947. Added commitJob and abortJob apis to OutputCommitter.
+    Enhanced FileOutputCommitter to create a _SUCCESS file for successful
+    jobs. (Amar Kamat & Jothi Padmanabhan via acmurthy) 
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java Wed Oct 21 10:00:17 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
@@ -88,7 +89,9 @@
     conf.set("fs.default.name", "file:///");
     FileSystem fs = FileSystem.get(conf);
 
-    FileStatus [] stats = fs.listStatus(getTablePath());
+    FileStatus [] stats = fs.listStatus(getTablePath(),
+        new Utils.OutputFileUtils.OutputFilesFilter());
+
     for (FileStatus stat : stats) {
       paths.add(stat.getPath());
     }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Wed Oct 21 10:00:17 2009
@@ -34,7 +34,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 /**
  * This test case tests the symlink creation
@@ -123,7 +123,8 @@
         String line2 = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                      new Path(OUTPUT_DIR),
-                                     new OutputLogFilter()));
+                                     new Utils.OutputFileUtils
+                                              .OutputFilesFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java Wed Oct 21 10:00:17 2009
@@ -24,6 +24,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 
 import junit.framework.TestCase;
 
@@ -65,7 +66,7 @@
   public void testCommandLine() throws Exception {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -86,10 +87,8 @@
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java Wed Oct 21 10:00:17 2009
@@ -21,6 +21,7 @@
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapreduce.JobContext;
 
 /**
@@ -72,7 +73,7 @@
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -92,10 +93,12 @@
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java Wed Oct 21 10:00:17 2009
@@ -23,6 +23,7 @@
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 
@@ -82,7 +83,7 @@
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -103,10 +104,12 @@
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java Wed Oct 21 10:00:17 2009
@@ -23,6 +23,7 @@
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -75,7 +76,7 @@
     File outFile = null;
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -94,11 +95,12 @@
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java Wed Oct 21 10:00:17 2009
@@ -22,6 +22,8 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests StreamXmlRecordReader
  * The test creates an XML file, uses StreamXmlRecordReader and compares
@@ -61,7 +63,7 @@
   public void testCommandLine() {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
       createInput();
@@ -74,10 +76,12 @@
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      INPUT_FILE.delete();
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Wed Oct 21 10:00:17 2009
@@ -21,6 +21,8 @@
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
  */
@@ -73,7 +75,7 @@
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -91,10 +93,12 @@
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java Wed Oct 21 10:00:17 2009
@@ -38,9 +38,9 @@
 import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 public class TestStreamingBadRecords extends ClusterMapReduceTestCase
@@ -126,7 +126,7 @@
     badRecs.addAll(REDUCER_BAD_RECORDS);
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java Wed Oct 21 10:00:17 2009
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
@@ -38,7 +39,7 @@
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -62,10 +63,12 @@
       assertNotNull("Counter", counter);
       assertEquals(3, counter.getCounter());
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
   

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java Wed Oct 21 10:00:17 2009
@@ -21,6 +21,8 @@
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode by giving
  * empty input to mapper and the mapper generates nonempty output. Since map()
@@ -100,11 +102,13 @@
       outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
       outFile.delete();
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      SCRIPT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        SCRIPT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java Wed Oct 21 10:00:17 2009
@@ -75,7 +75,11 @@
     } catch(Exception e) {
       // Expecting an exception
     } finally {
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java Wed Oct 21 10:00:17 2009
@@ -21,6 +21,8 @@
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
  * This testcase looks at different cases of tab position in input. 
@@ -78,7 +80,7 @@
     File outFile = null;
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -97,12 +99,12 @@
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR,
-                          "." + outFileName + ".crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java Wed Oct 21 10:00:17 2009
@@ -23,6 +23,7 @@
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -89,7 +90,7 @@
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -109,10 +110,12 @@
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        failTrace(e);
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Wed Oct 21 10:00:17 2009
@@ -34,7 +34,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 /**
  * This test case tests the symlink creation
@@ -113,7 +113,8 @@
         String line = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                                 new Path(OUTPUT_DIR),
-                                                new OutputLogFilter()));
+                                                new Utils.OutputFileUtils
+                                                         .OutputFilesFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java Wed Oct 21 10:00:17 2009
@@ -24,6 +24,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 
 import junit.framework.TestCase;
 
@@ -63,7 +64,7 @@
   public void testCommandLine() throws Exception {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -84,10 +85,8 @@
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java Wed Oct 21 10:00:17 2009
@@ -106,7 +106,7 @@
   public static class TeraOutputCommitter extends FileOutputCommitter {
 
     @Override
-    public void cleanupJob(JobContext jobContext) {
+    public void commitJob(JobContext jobContext) {
     }
 
     @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java Wed Oct 21 10:00:17 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.util.StringUtils;
 
 /** An {@link OutputCommitter} that commits files specified 
@@ -39,6 +40,9 @@
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
 
   public void setupJob(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
@@ -52,7 +56,38 @@
     }
   }
 
-  public void cleanupJob(JobContext context) throws IOException {
+  // True if the job requires output.dir marked on successful job.
+  // Note that by default it is set to true.
+  private boolean shouldMarkOutputDir(JobConf conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+  }
+  
+  public void commitJob(JobContext context) throws IOException {
+    // delete the _temporary folder in the output folder
+    cleanup(context);
+    // check if the output-dir marking is required
+    if (shouldMarkOutputDir(context.getJobConf())) {
+      // create a _success file in the output folder
+      markOutputDirSuccessful(context);
+    }
+  }
+  
+  // Create a _success file in the job's output folder
+  private void markOutputDirSuccessful(JobContext context) throws IOException {
+    JobConf conf = context.getJobConf();
+    // get the o/p path
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      // get the filesys
+      FileSystem fileSys = outputPath.getFileSystem(conf);
+      // create a file in the output folder to mark the job completion
+      Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+      fileSys.create(filePath).close();
+    }
+  }
+  
+  // Deletes the _temporary folder in the job's output dir.
+  private void cleanup(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
     // do the clean up of temporary directory
     Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -62,10 +97,19 @@
       context.getProgressible().progress();
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
+      } else {
+        LOG.warn("Output Path is Null in cleanup");
       }
     }
   }
 
+  @Override
+  public void abortJob(JobContext context, int runState) 
+  throws IOException {
+    // simply delete the _temporary dir from the o/p folder of the job
+    cleanup(context);
+  }
+  
   public void setupTask(TaskAttemptContext context) throws IOException {
     // FileOutputCommitter's setupTask doesn't do anything. Because the
     // temporary task directory is created on demand when the 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 21 10:00:17 2009
@@ -47,6 +47,7 @@
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
@@ -1344,6 +1345,16 @@
       Task result = tip.getTaskToRun(tts.getTrackerName());
       if (result != null) {
         addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+        if (jobFailed) {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+                .State.FAILED);
+        } else if (jobKilled) {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+                .State.KILLED);
+        } else {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
+                .JobStatus.State.SUCCEEDED);
+        }
       }
       return result;
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Wed Oct 21 10:00:17 2009
@@ -317,4 +317,9 @@
      return super.getReduceProgress(); 
    }
 
+   // A utility to convert new job runstates to the old ones.
+   static int getOldNewJobRunState(
+     org.apache.hadoop.mapreduce.JobStatus.State state) {
+     return state.getValue();
+   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Oct 21 10:00:17 2009
@@ -310,7 +310,7 @@
           }
         }
         // delete the temporary directory in output directory
-        outputCommitter.cleanupJob(jContext);
+        outputCommitter.commitJob(jContext);
         status.setCleanupProgress(1.0f);
 
         if (killed) {
@@ -323,7 +323,8 @@
 
       } catch (Throwable t) {
         try {
-          outputCommitter.cleanupJob(jContext);
+          outputCommitter.abortJob(jContext, 
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         } catch (IOException ioe) {
           LOG.info("Error cleaning up job:" + id);
         }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputCommitter.java Wed Oct 21 10:00:17 2009
@@ -71,10 +71,38 @@
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
+   * @deprecated Use {@link #commitJob(JobContext)} or 
+   *                 {@link #abortJob(JobContext, int)} instead.
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
 
   /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException 
+   */
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final runstate as {@link JobStatus#FAILED} or 
+   * {@link JobStatus#KILLED}
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @param status final runstate of the job
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, int status) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
    * Sets up output for the task.
    * 
    * @param taskContext Context of the task whose output is being written.
@@ -128,8 +156,12 @@
    * This method implements the new interface by calling the old method. Note
    * that the input types are different between the new and old apis and this
    * is a bridge between the two.
+   * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
+   *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
+   *             instead.
    */
   @Override
+  @Deprecated
   public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
                                ) throws IOException {
     cleanupJob((JobContext) context);
@@ -141,6 +173,33 @@
    * is a bridge between the two.
    */
   @Override
+  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
+                             ) throws IOException {
+    commitJob((JobContext) context);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
+		                   org.apache.hadoop.mapreduce.JobStatus.State runState) 
+  throws IOException {
+    int state = JobStatus.getOldNewJobRunState(runState);
+    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+      throw new IOException ("Invalid job run state : " + runState.name());
+    }
+    abortJob((JobContext) context, state);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
   public final 
   void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
                  ) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java Wed Oct 21 10:00:17 2009
@@ -27,9 +27,14 @@
  * This can be used to list paths of output directory as follows:
  *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  *                                   new OutputLogFilter()));
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter} 
+ *   instead.
  */
 public class OutputLogFilter implements PathFilter {
+  private static final PathFilter LOG_FILTER = 
+    new Utils.OutputFileUtils.OutputLogFilter();
   public boolean accept(Path path) {
-    return !(path.toString().contains("_logs"));
+    return LOG_FILTER.accept(path);
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Oct 21 10:00:17 2009
@@ -42,9 +42,11 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
@@ -129,6 +131,7 @@
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
+  protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
   protected boolean jobSetup = false;
   protected boolean taskCleanup = false;
@@ -322,6 +325,14 @@
     return jobCleanup;
   }
 
+  boolean isJobAbortTask() {
+    // the task is an abort task if its marked for cleanup and the final 
+    // expected state is either failed or killed.
+    return isJobCleanupTask() 
+           && (jobRunStateForCleanup == JobStatus.State.KILLED 
+               || jobRunStateForCleanup == JobStatus.State.FAILED);
+  }
+  
   boolean isJobSetupTask() {
     return jobSetup;
   }
@@ -334,6 +345,14 @@
     jobCleanup = true; 
   }
 
+  /**
+   * Sets the task to do job abort in the cleanup.
+   * @param status the final runstate of the job. 
+   */
+  void setJobCleanupTaskState(JobStatus.State status) {
+    jobRunStateForCleanup = status;
+  }
+  
   boolean isMapOrReduce() {
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
@@ -362,6 +381,9 @@
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(jobCleanup);
+    if (jobCleanup) {
+      WritableUtils.writeEnum(out, jobRunStateForCleanup);
+    }
     out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);
@@ -379,6 +401,10 @@
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     jobCleanup = in.readBoolean();
+    if (jobCleanup) {
+      jobRunStateForCleanup = 
+        WritableUtils.readEnum(in, JobStatus.State.class);
+    }
     jobSetup = in.readBoolean();
     writeSkipRecs = in.readBoolean();
     taskCleanup = in.readBoolean();
@@ -872,7 +898,27 @@
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
     // do the cleanup
-    committer.cleanupJob(jobContext);
+    LOG.info("Cleaning up job");
+    if (jobRunStateForCleanup == JobStatus.State.FAILED 
+        || jobRunStateForCleanup == JobStatus.State.KILLED) {
+      LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name());
+      if (conf.getUseNewMapper()) {
+        committer.abortJob(jobContext, jobRunStateForCleanup);
+      } else {
+        org.apache.hadoop.mapred.OutputCommitter oldCommitter = 
+          (org.apache.hadoop.mapred.OutputCommitter)committer;
+        oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
+      }
+    } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+      LOG.info("Committing job");
+      committer.commitJob(jobContext);
+    } else {
+      throw new IOException("Invalid state of the job for cleanup. State found "
+                            + jobRunStateForCleanup + " expecting "
+                            + JobStatus.State.SUCCEEDED + ", " 
+                            + JobStatus.State.FAILED + " or "
+                            + JobStatus.State.KILLED);
+    }
     done(umbilical, reporter);
   }
 

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Utils.java?rev=827947&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Utils.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Utils.java Wed Oct 21 10:00:17 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * A utility class. It provides
+ *   A path filter utility to filter out output/part files in the output dir
+ */
+public class Utils {
+  public static class OutputFileUtils {
+    /**
+     * This class filters output(part) files from the given directory
+     * It does not accept files with filenames _logs and _SUCCESS.
+     * This can be used to list paths of output directory as follows:
+     *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     *                                         new OutputFilesFilter()));
+     */
+    public static class OutputFilesFilter extends OutputLogFilter {
+      public boolean accept(Path path) {
+        return super.accept(path) 
+               && !FileOutputCommitter.SUCCEEDED_FILE_NAME
+                   .equals(path.getName());
+      }
+    }
+    
+    /**
+     * This class filters log files from directory given
+     * It doesnt accept paths having _logs.
+     * This can be used to list paths of output directory as follows:
+     *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     *                                   new OutputLogFilter()));
+     */
+    public static class OutputLogFilter implements PathFilter {
+      public boolean accept(Path path) {
+        return !(path.toString().contains("_logs"));
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java Wed Oct 21 10:00:17 2009
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
-
 /**
  * <code>OutputCommitter</code> describes the commit of task output for a 
  * Map-Reduce job.
@@ -69,9 +68,38 @@
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
+   * @deprecated Use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, JobStatus.State)} instead.
+   */
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
+
+  /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
 
+  
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final runstate as {@link JobStatus.State#FAILED} or 
+   * {@link JobStatus.State#KILLED}.
+   *
+   * @param jobContext Context of the job whose output is being written.
+   * @param state final runstate of the job
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, JobStatus.State state) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
   /**
    * Sets up output for the task.
    * 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Wed Oct 21 10:00:17 2009
@@ -23,10 +23,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -43,6 +45,9 @@
    * Temporary directory name 
    */
   protected static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
   private FileSystem outputFileSystem = null;
   private Path outputPath = null;
   private Path workPath = null;
@@ -80,21 +85,59 @@
     }
   }
 
+  // True if the job requires output.dir marked on successful job.
+  // Note that by default it is set to true.
+  private boolean shouldMarkOutputDir(Configuration conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+  }
+  
+  // Create a _success file in the job's output dir
+  private void markOutputDirSuccessful(JobContext context) throws IOException {
+    if (outputPath != null) {
+      // create a file in the output folder to mark the job completion
+      Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+      outputFileSystem.create(filePath).close();
+    }
+  }
+  
   /**
    * Delete the temporary directory, including all of the work directories.
+   * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
-  public void cleanupJob(JobContext context) throws IOException {
+  public void commitJob(JobContext context) throws IOException {
+    // delete the _temporary folder and create a _done file in the o/p folder
+    cleanup(context);
+    if (shouldMarkOutputDir(context.getConfiguration())) {
+      markOutputDirSuccessful(context);
+    }
+  }
+  
+  // Delete the _temporary folder in the output dir.
+  private void cleanup(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
       }
+    } else {
+      LOG.warn("Output Path is null in cleanup");
     }
   }
 
   /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   */
+  @Override
+  public void abortJob(JobContext context, JobStatus.State state) 
+  throws IOException {
+    // delete the _temporary folder
+    cleanup(context);
+  }
+  
+  /**
    * No task setup required.
    */
   @Override

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Wed Oct 21 10:00:17 2009
@@ -129,6 +129,15 @@
        <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
      </Match>
      <Match>
+       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+       <Or>
+       <Method name="abortJob" />
+       <Method name="commitJob" />
+       <Method name="cleanupJob" />
+       </Or>
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
+     <Match>
        <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
        <Method name="next" />
        <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Wed Oct 21 10:00:17 2009
@@ -84,7 +84,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(outDir,
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Wed Oct 21 10:00:17 2009
@@ -257,7 +257,9 @@
    */
   protected void assertOwnerShip(Path outDir, FileSystem fs)
       throws IOException {
-    for (FileStatus status : fs.listStatus(outDir, new OutputLogFilter())) {
+    for (FileStatus status : fs.listStatus(outDir, 
+                                           new Utils.OutputFileUtils
+                                                    .OutputFilesFilter())) {
       String owner = status.getOwner();
       String group = status.getGroup();
       LOG.info("Ownership of the file is " + status.getPath() + " is " + owner

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java Wed Oct 21 10:00:17 2009
@@ -167,7 +167,7 @@
     
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     
     List<String> mapperOutput=getProcessed(input, mapperBadRecords);
     LOG.debug("mapperOutput " + mapperOutput.size());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Wed Oct 21 10:00:17 2009
@@ -64,7 +64,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java Wed Oct 21 10:00:17 2009
@@ -50,11 +50,11 @@
 
   MiniMRCluster mr = null;
 
-  /** Committer with cleanup waiting on a signal
+  /** Committer with commit waiting on a signal
    */
-  static class CommitterWithDelayCleanup extends FileOutputCommitter {
+  static class CommitterWithDelayCommit extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       Configuration conf = context.getConfiguration();
       Path share = new Path(conf.get("share"));
       FileSystem fs = FileSystem.get(conf);
@@ -66,7 +66,7 @@
         }
         UtilsForTests.waitFor(100);
       }
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -103,7 +103,7 @@
     conf.setJobName("empty");
     // use an InputFormat which returns no split
     conf.setInputFormat(EmptyInputFormat.class);
-    conf.setOutputCommitter(CommitterWithDelayCleanup.class);
+    conf.setOutputCommitter(CommitterWithDelayCommit.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);
     conf.setMapperClass(IdentityMapper.class);
@@ -197,7 +197,8 @@
         + " and not 1.0", runningJob.cleanupProgress() == 1.0);
 
     assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
-    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    FileStatus[] list = fs.listStatus(outDir, 
+                          new Utils.OutputFileUtils.OutputFilesFilter());
     assertTrue("Number of part-files is " + list.length + " and not "
         + numReduces, list.length == numReduces);
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Wed Oct 21 10:00:17 2009
@@ -75,7 +75,7 @@
       theRecordWriter.close(reporter);
     }
     committer.commitTask(tContext);
-    committer.cleanupJob(jContext);
+    committer.commitJob(jContext);
     
     File expectedFile = new File(new Path(outDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java Wed Oct 21 10:00:17 2009
@@ -116,7 +116,8 @@
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -161,7 +162,8 @@
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
   }
 

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java?rev=827947&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java Wed Oct 21 10:00:17 2009
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * A JUnit test to test Map-Reduce job cleanup.
+ */
+public class TestJobCleanup extends TestCase {
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp") + "/" 
+               + "test-job-cleanup").toString();
+  private static final String ABORT_KILLED_FILE_NAME = 
+    "_custom_abort_killed";
+  private static final String ABORT_FAILED_FILE_NAME = 
+    "_custom_abort_failed";
+  private static FileSystem fileSys = null;
+  private static MiniMRCluster mr = null;
+  private static Path inDir = null;
+  private static Path emptyInDir = null;
+  private static int outDirs = 0;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        fileSys = FileSystem.get(conf);
+        fileSys.delete(new Path(TEST_ROOT_DIR), true);
+        conf.set("mapred.job.tracker.handler.count", "1");
+        conf.set("mapred.job.tracker", "127.0.0.1:0");
+        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+        conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+
+        mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+        inDir = new Path(TEST_ROOT_DIR, "test-input");
+        String input = "The quick brown fox\n" + "has many silly\n"
+                       + "red fox sox\n";
+        DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
+        file.writeBytes(input);
+        file.close();
+        emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
+        fileSys.mkdirs(emptyInDir);
+      }
+      
+      protected void tearDown() throws Exception {
+        if (fileSys != null) {
+          fileSys.close();
+        }
+        if (mr != null) {
+          mr.shutdown();
+        }
+      }
+    };
+    return setup;
+  }
+  
+  /** 
+   * Committer with abort making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomAbort extends FileOutputCommitter {
+    @Override
+    public void abortJob(JobContext context, int state) 
+    throws IOException {
+      JobConf conf = context.getJobConf();;
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      String fileName = (state == JobStatus.FAILED) 
+                        ? TestJobCleanup.ABORT_FAILED_FILE_NAME 
+                        : TestJobCleanup.ABORT_KILLED_FILE_NAME;
+      fs.create(new Path(outputPath, fileName)).close();
+    }
+  }
+  
+  private Path getNewOutputDir() {
+    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+  }
+  
+  private void configureJob(JobConf jc, String jobName, int maps, int reds, 
+                            Path outDir) {
+    jc.setJobName(jobName);
+    jc.setInputFormat(TextInputFormat.class);
+    jc.setOutputKeyClass(LongWritable.class);
+    jc.setOutputValueClass(Text.class);
+    FileInputFormat.setInputPaths(jc, inDir);
+    FileOutputFormat.setOutputPath(jc, outDir);
+    jc.setMapperClass(IdentityMapper.class);
+    jc.setReducerClass(IdentityReducer.class);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reds);
+  }
+  
+  // run a job with 1 map and let it run to completion
+  private void testSuccessfulJob(String filename, 
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "job with cleanup()", 1, 0, outDir);
+    jc.setOutputCommitter(committer);
+    
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    job.waitForCompletion();
+    
+    Path testFile = new Path(outDir, filename);
+    assertTrue("Done file missing for job " + id, fileSys.exists(testFile));
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for successful job " 
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  // run a job for which all the attempts simply fail.
+  private void testFailedJob(String fileName, 
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "fail job with abort()", 1, 0, outDir);
+    jc.setMaxMapAttempts(1);
+    // set the job to fail
+    jc.setMapperClass(UtilsForTests.FailMapper.class);
+    jc.setOutputCommitter(committer);
+
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    job.waitForCompletion();
+    
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for failed job " + id, 
+                 fileSys.exists(testFile));
+    }
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for failed job "
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  // run a job which gets stuck in mapper and kill it.
+  private void testKilledJob(String fileName,
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "kill job with abort()", 1, 0, outDir);
+    // set the job to wait for long
+    jc.setMapperClass(UtilsForTests.KillMapper.class);
+    jc.setOutputCommitter(committer);
+    
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    JobInProgress jip = 
+      mr.getJobTrackerRunner().getJobTracker().getJob(job.getID());
+    
+    // wait for the map to be launched
+    while (true) {
+      if (jip.runningMaps() == 1) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+    }
+    
+    job.killJob(); // kill the job
+    
+    job.waitForCompletion(); // wait for the job to complete
+    
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for job " + id, 
+                 fileSys.exists(testFile));
+    }
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for killed job "
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  /**
+   * Test default cleanup/abort behavior
+   * 
+   * @throws IOException
+   */
+  public void testDefaultCleanupAndAbort() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      FileOutputCommitter.class,
+                      new String[] {});
+    
+    // check with a failed job
+    testFailedJob(null, 
+                  FileOutputCommitter.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+    
+    // check default abort job kill
+    testKilledJob(null, 
+                  FileOutputCommitter.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
+  
+  /**
+   * Test if a failed job with custom committer runs the abort code.
+   * 
+   * @throws IOException
+   */
+  public void testCustomAbort() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                      CommitterWithCustomAbort.class,
+                      new String[] {ABORT_FAILED_FILE_NAME, 
+                                    ABORT_KILLED_FILE_NAME});
+    
+    // check with a failed job
+    testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                                ABORT_KILLED_FILE_NAME});
+    
+    // check with a killed job
+    testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                                ABORT_FAILED_FILE_NAME});
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java Wed Oct 21 10:00:17 2009
@@ -61,7 +61,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -95,7 +95,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java Wed Oct 21 10:00:17 2009
@@ -157,7 +157,7 @@
 
       Path[] fileList = 
         FileUtil.stat2Paths(fileSys.listStatus(output1,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -169,7 +169,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output2,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -182,7 +182,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output3,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
       }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java Wed Oct 21 10:00:17 2009
@@ -80,7 +80,7 @@
     {
       Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-              new OutputLogFilter()));
+              new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
@@ -135,7 +135,8 @@
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                 new OutputLogFilter()));
+                                 new Utils.OutputFileUtils
+                                          .OutputFilesFilter()));
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
                                                                      fs.open(fileList[i])));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Wed Oct 21 10:00:17 2009
@@ -107,7 +107,8 @@
     {
       
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                   new OutputLogFilter()));
+                                   new Utils.OutputFileUtils
+                                            .OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
         BufferedReader file = 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Wed Oct 21 10:00:17 2009
@@ -49,16 +49,16 @@
     }
   }
 
-  // Commiter with cleanupJob throwing exception
-  static class CommitterWithFailCleanup extends FileOutputCommitter {
+  // Commiter with commitJob throwing exception
+  static class CommitterWithFailCommit extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       throw new IOException();
     }
   }
 
   // Committer waits for a file to be created on dfs.
-  static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter {
+  static class CommitterWithLongSetupAndCommit extends FileOutputCommitter {
     
     private void waitForSignalFile(FileSystem fs, Path signalFile) 
     throws IOException {
@@ -78,9 +78,9 @@
     }
     
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -123,7 +123,7 @@
   throws IOException {
     // launch job with waiting setup/cleanup
     JobConf jobConf = mr.createJobConf();
-    jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
+    jobConf.setOutputCommitter(CommitterWithLongSetupAndCommit.class);
     RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
@@ -238,7 +238,7 @@
                              null, null, jtConf);
       // test setup/cleanup throwing exceptions
       testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
-      testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
+      testFailCommitter(CommitterWithFailCommit.class, mr.createJobConf());
       // test the command-line kill for setup/cleanup attempts. 
       testSetupAndCleanupKill(mr, dfs, true);
       // remove setup/cleanup signal files.

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Wed Oct 21 10:00:17 2009
@@ -152,7 +152,7 @@
 
   private void verifyOutput(FileSystem fs, Path outDir) throws IOException {
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(outDir, new OutputLogFilter()));
+        fs.listStatus(outDir, new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(numReduces, outputFiles.length);
     InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java Wed Oct 21 10:00:17 2009
@@ -101,7 +101,8 @@
     RunningJob runningJob = JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = fs.open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java Wed Oct 21 10:00:17 2009
@@ -50,6 +50,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -317,7 +318,8 @@
     job.setOutputFormat(SequenceFileOutputFormat.class);
     JobClient.runJob(job);
 
-    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, 
+                             new Utils.OutputFileUtils.OutputFilesFilter());
     assertEquals(1, outlist.length);
     assertTrue(0 < outlist[0].getLen());
     SequenceFile.Reader r =

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Wed Oct 21 10:00:17 2009
@@ -31,10 +31,10 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 
 
 public class TestKeyFieldBasedComparator extends HadoopTestCase {
@@ -95,7 +95,7 @@
     }
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(outDir,
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=827947&r1=827946&r2=827947&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java Wed Oct 21 10:00:17 2009
@@ -37,9 +37,9 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -197,7 +197,8 @@
 
     List<String> results = new ArrayList<String>();
     for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
-    		                        new OutputLogFilter()))) {
+    		                        new Utils.OutputFileUtils
+    		                                 .OutputFilesFilter()))) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 



Mime
View raw message