hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r834284 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/lib/output/
Date Tue, 10 Nov 2009 00:19:43 GMT
Author: acmurthy
Date: Tue Nov 10 00:19:42 2009
New Revision: 834284

URL: http://svn.apache.org/viewvc?rev=834284&view=rev
Log:
MAPREDUCE-1196. Fix FileOutputCommitter to use the deprecated cleanupJob api correctly.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=834284&r1=834283&r2=834284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Nov 10 00:19:42 2009
@@ -74,6 +74,9 @@
 
     MAPREDUCE-665. Move libhdfs to HDFS subproject. (Eli Collins via dhruba)
 
+    MAPREDUCE-1196. Fix FileOutputCommitter to use the deprecated cleanupJob
+    api correctly. (acmurthy)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=834284&r1=834283&r2=834284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue
Nov 10 00:19:42 2009
@@ -64,7 +64,7 @@
   
   public void commitJob(JobContext context) throws IOException {
     // delete the _temporary folder in the output folder
-    cleanup(context);
+    cleanupJob(context);
     // check if the output-dir marking is required
     if (shouldMarkOutputDir(context.getJobConf())) {
       // create a _success file in the output folder
@@ -85,9 +85,10 @@
       fileSys.create(filePath).close();
     }
   }
-  
-  // Deletes the _temporary folder in the job's output dir.
-  private void cleanup(JobContext context) throws IOException {
+
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
     // do the clean up of temporary directory
     Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -107,7 +108,7 @@
   public void abortJob(JobContext context, int runState) 
   throws IOException {
     // simply delete the _temporary dir from the o/p folder of the job
-    cleanup(context);
+    cleanupJob(context);
   }
   
   public void setupTask(TaskAttemptContext context) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=834284&r1=834283&r2=834284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
Tue Nov 10 00:19:42 2009
@@ -107,14 +107,15 @@
    */
   public void commitJob(JobContext context) throws IOException {
     // delete the _temporary folder and create a _done file in the o/p folder
-    cleanup(context);
+    cleanupJob(context);
     if (shouldMarkOutputDir(context.getConfiguration())) {
       markOutputDirSuccessful(context);
     }
   }
-  
-  // Delete the _temporary folder in the output dir.
-  private void cleanup(JobContext context) throws IOException {
+
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
@@ -134,7 +135,7 @@
   public void abortJob(JobContext context, JobStatus.State state) 
   throws IOException {
     // delete the _temporary folder
-    cleanup(context);
+    cleanupJob(context);
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java?rev=834284&r1=834283&r2=834284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java Tue
Nov 10 00:19:42 2009
@@ -41,6 +41,8 @@
   private static String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp") + "/" 
                + "test-job-cleanup").toString();
+  private static final String CUSTOM_CLEANUP_FILE_NAME = 
+    "_custom_cleanup";
   private static final String ABORT_KILLED_FILE_NAME = 
     "_custom_abort_killed";
   private static final String ABORT_FAILED_FILE_NAME = 
@@ -86,6 +88,21 @@
   }
   
   /** 
+   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+   * making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      System.err.println("---- HERE ----");
+      JobConf conf = context.getJobConf();
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+    }
+  }
+  
+  /** 
    * Committer with abort making a _failed/_killed in the output folder
    */
   static class CommitterWithCustomAbort extends FileOutputCommitter {
@@ -263,4 +280,26 @@
                   new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
                                 ABORT_FAILED_FILE_NAME});
   }
+
+  /**
+   * Test if a failed job with custom committer runs the deprecated
+   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
+   * compatibility testing.
+   */
+  public void testCustomCleanup() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
+                      CommitterWithCustomDeprecatedCleanup.class,
+                      new String[] {});
+    
+    // check with a failed job
+    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  CommitterWithCustomDeprecatedCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+    
+    // check with a killed job
+    testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, 
+                  CommitterWithCustomDeprecatedCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
 }
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java?rev=834284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
Tue Nov 10 00:19:42 2009
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A JUnit test to test Map-Reduce job committer.
+ */
+public class TestJobOutputCommitter extends HadoopTestCase {
+
+  public TestJobOutputCommitter() throws IOException {
+    super(CLUSTER_MR, LOCAL_FS, 1, 1);
+  }
+
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")
+      + "/" + "test-job-cleanup").toString();
+  private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
+  private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
+  private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
+  private static Path inDir = new Path(TEST_ROOT_DIR, "test-input");
+  private static int outDirs = 0;
+  private FileSystem fs;
+  private Configuration conf = null;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf = createJobConf();
+    fs = getFileSystem();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    fs.delete(new Path(TEST_ROOT_DIR), true);
+    super.tearDown();
+  }
+
+  /** 
+   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+   * making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+    public CommitterWithCustomDeprecatedCleanup(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      System.err.println("---- HERE ----");
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+    }
+  }
+  
+  /**
+   * Committer with abort making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomAbort extends FileOutputCommitter {
+    public CommitterWithCustomAbort(Path outputPath, TaskAttemptContext context)
+        throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      String fileName = 
+        (state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
+          : ABORT_KILLED_FILE_NAME;
+      fs.create(new Path(outputPath, fileName)).close();
+    }
+  }
+
+  private Path getNewOutputDir() {
+    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+  }
+
+  static class MyOutputFormatWithCustomAbort<K, V> 
+  extends TextOutputFormat<K, V> {
+    private OutputCommitter committer = null;
+
+    public synchronized OutputCommitter getOutputCommitter(
+        TaskAttemptContext context) throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        committer = new CommitterWithCustomAbort(output, context);
+      }
+      return committer;
+    }
+  }
+
+  static class MyOutputFormatWithCustomCleanup<K, V> 
+  extends TextOutputFormat<K, V> {
+    private OutputCommitter committer = null;
+
+    public synchronized OutputCommitter getOutputCommitter(
+        TaskAttemptContext context) throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        committer = new CommitterWithCustomDeprecatedCleanup(output, context);
+      }
+      return committer;
+    }
+  }
+
+  // run a job with 1 map and let it run to completion
+  private void testSuccessfulJob(String filename,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0);
+    job.setOutputFormatClass(output);
+
+    assertTrue("Job failed!", job.waitForCompletion(true));
+
+    Path testFile = new Path(outDir, filename);
+    assertTrue("Done file missing for job " + job.getID(), fs.exists(testFile));
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for successful job "
+          + job.getID(), fs.exists(file));
+    }
+  }
+
+  // run a job for which all the attempts simply fail.
+  private void testFailedJob(String fileName,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createFailJob(conf, outDir, inDir);
+    job.setOutputFormatClass(output);
+
+    assertFalse("Job did not fail!", job.waitForCompletion(true));
+
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for failed job " + job.getID(),
+          fs.exists(testFile));
+    }
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for failed job "
+          + job.getID(), fs.exists(file));
+    }
+  }
+
+  // run a job which gets stuck in mapper and kill it.
+  private void testKilledJob(String fileName,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
+    job.setOutputFormatClass(output);
+
+    job.submit();
+
+    // wait for the setup to be completed
+    while (job.setupProgress() != 1.0f) {
+      UtilsForTests.waitFor(100);
+    }
+
+    job.killJob(); // kill the job
+
+    assertFalse("Job did not get kill", job.waitForCompletion(true));
+
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for job " + job.getID(), fs
+          .exists(testFile));
+    }
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for killed job "
+          + job.getID(), fs.exists(file));
+    }
+  }
+
+  /**
+   * Test default cleanup/abort behavior
+   * 
+   * @throws Exception
+   */
+  public void testDefaultCleanupAndAbort() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      TextOutputFormat.class, new String[] {});
+
+    // check with a failed job
+    testFailedJob(null, TextOutputFormat.class,
+                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+
+    // check default abort job kill
+    testKilledJob(null, TextOutputFormat.class,
+                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+  }
+
+  /**
+   * Test if a failed job with custom committer runs the abort code.
+   * 
+   * @throws Exception
+   */
+  public void testCustomAbort() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      MyOutputFormatWithCustomAbort.class, 
+                      new String[] {ABORT_FAILED_FILE_NAME,
+                                    ABORT_KILLED_FILE_NAME});
+
+    // check with a failed job
+    testFailedJob(ABORT_FAILED_FILE_NAME, 
+                  MyOutputFormatWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                                ABORT_KILLED_FILE_NAME});
+
+    // check with a killed job
+    testKilledJob(ABORT_KILLED_FILE_NAME, 
+                  MyOutputFormatWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                                ABORT_FAILED_FILE_NAME});
+  }
+
+  /**
+   * Test if a failed job with custom committer runs the deprecated
+   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
+   * compatibility testing.
+   * @throws Exception 
+   */
+  public void testCustomCleanup() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
+                      MyOutputFormatWithCustomCleanup.class, 
+                      new String[] {});
+
+    // check with a failed job
+    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  MyOutputFormatWithCustomCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+    // check with a killed job
+    testKilledJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  MyOutputFormatWithCustomCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
+}



Mime
View raw message