hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077050 - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/output/ src/test/org/apache/hadoop/mapred/ src/te...
Date Fri, 04 Mar 2011 03:35:41 GMT
Author: omalley
Date: Fri Mar  4 03:35:41 2011
New Revision: 1077050

URL: http://svn.apache.org/viewvc?rev=1077050&view=rev
Log:
commit 5c98aaf6bed0247e8a6c6dd763e14f50eb1b173c
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Mon Nov 9 16:24:12 2009 -0800

    MAPREDUCE:1196 from https://issues.apache.org/jira/secure/attachment/12424351/MAPREDUCE-1196_yhadoop20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +yahoo-hadoop-0.20.1-3092118005:
    +
    +    MAPREDUCE-1196. Fix FileOutputCommitter to use the deprecated cleanupJob
    +    api correctly. (acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/build.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/common/branches/branch-0.20-security-patches/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/build.xml?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/build.xml Fri Mar  4 03:35:41 2011
@@ -27,7 +27,7 @@
  
   <property name="Name" value="Hadoop"/>
   <property name="name" value="hadoop"/>
-  <property name="version" value="0.20.1.3092118004"/>
+  <property name="version" value="0.20.1.3092118005"/>
   <property name="final.name" value="${name}-${version}"/>
   <property name="year" value="2009"/>
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
Fri Mar  4 03:35:41 2011
@@ -77,13 +77,15 @@ public class FileOutputCommitter extends
   
   @Override
   public void commitJob(JobContext context) throws IOException {
-    cleanup(context);
+    cleanupJob(context);
     if (getOutputDirMarking(context.getJobConf())) {
       markSuccessfulOutputDir(context);
     }
   }
   
-  private void cleanup(JobContext context) throws IOException {
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
     // do the clean up of temporary directory
     Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -107,7 +109,7 @@ public class FileOutputCommitter extends
    */
   @Override
   public void abortJob(JobContext context, int runState) throws IOException {
-    cleanup(context);
+    cleanupJob(context);
   }
   
   public void setupTask(TaskAttemptContext context) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
Fri Mar  4 03:35:41 2011
@@ -265,6 +265,18 @@ public class Job extends JobContext {  
   }
 
   /**
+   * Get the <i>progress</i> of the job's setup, as a float between 0.0 
+   * and 1.0.  When the job setup is completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's setup.
+   * @throws IOException
+   */
+  public float setupProgress() throws IOException {
+    ensureState(JobState.RUNNING);
+    return info.setupProgress();
+  }
+
+  /**
    * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
    * and 1.0.  When all map tasks have completed, the function returns 1.0.
    * 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
Fri Mar  4 03:35:41 2011
@@ -110,7 +110,7 @@ public class FileOutputCommitter extends
    */
   public void commitJob(JobContext context) throws IOException {
     // delete the _temporary folder
-    cleanup(context);
+    cleanupJob(context);
     // check if the o/p dir should be marked
     if (shouldMarkOutputDir(context.getConfiguration())) {
       // create a _success file in the o/p folder
@@ -118,8 +118,9 @@ public class FileOutputCommitter extends
     }
   }
 
-  private void cleanup(JobContext context) 
-  throws IOException {
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
@@ -139,7 +140,7 @@ public class FileOutputCommitter extends
   @Override
   public void abortJob(JobContext context, JobStatus.State state)
   throws IOException {
-    cleanup(context);
+    cleanupJob(context);
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
Fri Mar  4 03:35:41 2011
@@ -41,6 +41,8 @@ public class TestJobCleanup extends Test
   private static String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp") + "/" 
                + "test-job-cleanup").toString();
+  private static final String CUSTOM_CLEANUP_FILE_NAME = 
+    "_custom_cleanup";
   private static final String ABORT_KILLED_FILE_NAME = 
     "_custom_abort_killed";
   private static final String ABORT_FAILED_FILE_NAME = 
@@ -86,6 +88,21 @@ public class TestJobCleanup extends Test
   }
   
   /** 
+   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+   * making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      System.err.println("---- HERE ----");
+      JobConf conf = context.getJobConf();
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+    }
+  }
+  
+  /** 
    * Committer with abort making a _failed/_killed in the output folder
    */
   static class CommitterWithCustomAbort extends FileOutputCommitter {
@@ -264,4 +281,26 @@ public class TestJobCleanup extends Test
                   new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
                                 ABORT_FAILED_FILE_NAME});
   }
+
+  /**
+   * Test if a failed job with custom committer runs the deprecated
+   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
+   * compatibility testing.
+   */
+  public void testCustomCleanup() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
+                      CommitterWithCustomDeprecatedCleanup.class,
+                      new String[] {});
+    
+    // check with a failed job
+    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  CommitterWithCustomDeprecatedCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+    
+    // check with a killed job
+    testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, 
+                  CommitterWithCustomDeprecatedCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Fri Mar  4 03:35:41 2011
@@ -222,7 +222,7 @@ public class UtilsForTests {
   /**
    * A utility that waits for specified amount of time
    */
-  static void waitFor(long duration) {
+  public static void waitFor(long duration) {
     try {
       synchronized (waitLock) {
         waitLock.wait(duration);

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java?rev=1077050&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
Fri Mar  4 03:35:41 2011
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A JUnit test to test Map-Reduce job committer.
+ */
+public class TestJobOutputCommitter extends HadoopTestCase {
+
+  public TestJobOutputCommitter() throws IOException {
+    super(CLUSTER_MR, LOCAL_FS, 1, 1);
+  }
+
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")
+      + "/" + "test-job-cleanup").toString();
+  private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
+  private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
+  private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
+  private static Path inDir = new Path(TEST_ROOT_DIR, "test-input");
+  private static int outDirs = 0;
+  private FileSystem fs;
+  private Configuration conf = null;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf = createJobConf();
+    conf.setBoolean(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+    fs = getFileSystem();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    fs.delete(new Path(TEST_ROOT_DIR), true);
+    super.tearDown();
+  }
+
+  /** 
+   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+   * making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+    public CommitterWithCustomDeprecatedCleanup(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      System.err.println("---- HERE ----");
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+    }
+  }
+  
+  /**
+   * Committer with abort making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomAbort extends FileOutputCommitter {
+    public CommitterWithCustomAbort(Path outputPath, TaskAttemptContext context)
+        throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      String fileName = 
+        (state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
+          : ABORT_KILLED_FILE_NAME;
+      fs.create(new Path(outputPath, fileName)).close();
+    }
+  }
+
+  private Path getNewOutputDir() {
+    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+  }
+
+  static class MyOutputFormatWithCustomAbort<K, V> 
+  extends TextOutputFormat<K, V> {
+    private OutputCommitter committer = null;
+
+    public synchronized OutputCommitter getOutputCommitter(
+        TaskAttemptContext context) throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        committer = new CommitterWithCustomAbort(output, context);
+      }
+      return committer;
+    }
+  }
+
+  static class MyOutputFormatWithCustomCleanup<K, V> 
+  extends TextOutputFormat<K, V> {
+    private OutputCommitter committer = null;
+
+    public synchronized OutputCommitter getOutputCommitter(
+        TaskAttemptContext context) throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        committer = new CommitterWithCustomDeprecatedCleanup(output, context);
+      }
+      return committer;
+    }
+  }
+
+  // run a job with 1 map and let it run to completion
+  private void testSuccessfulJob(String filename,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0);
+    job.setOutputFormatClass(output);
+
+    assertTrue("Job failed!", job.waitForCompletion(true));
+
+    Path testFile = new Path(outDir, filename);
+    assertTrue("Done file missing for job ", fs.exists(testFile));
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse(
+          "File " + file + " should not be present for successful job ", fs
+              .exists(file));
+    }
+  }
+
+  // run a job for which all the attempts simply fail.
+  private void testFailedJob(String fileName,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createFailJob(conf, outDir, inDir);
+    job.setOutputFormatClass(output);
+
+    assertFalse("Job did not fail!", job.waitForCompletion(true));
+
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for failed job ", fs
+          .exists(testFile));
+    }
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for failed job ", fs
+          .exists(file));
+    }
+  }
+
+  // run a job which gets stuck in mapper and kill it.
+  private void testKilledJob(String fileName,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
+    job.setOutputFormatClass(output);
+
+    job.submit();
+
+    // wait for the setup to be completed
+    while (job.setupProgress() != 1.0f) {
+      UtilsForTests.waitFor(100);
+    }
+
+    job.killJob(); // kill the job
+
+    assertFalse("Job did not get kill", job.waitForCompletion(true));
+
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for job ", fs.exists(testFile));
+    }
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for killed job ", fs
+          .exists(file));
+    }
+  }
+
+  /**
+   * Test default cleanup/abort behavior
+   * 
+   * @throws Exception
+   */
+  public void testDefaultCleanupAndAbort() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      TextOutputFormat.class, new String[] {});
+
+    // check with a failed job
+    testFailedJob(null, TextOutputFormat.class,
+                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+
+    // check default abort job kill
+    testKilledJob(null, TextOutputFormat.class,
+                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+  }
+
+  /**
+   * Test if a failed job with custom committer runs the abort code.
+   * 
+   * @throws Exception
+   */
+  public void testCustomAbort() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      MyOutputFormatWithCustomAbort.class, 
+                      new String[] {ABORT_FAILED_FILE_NAME, 
+                                    ABORT_KILLED_FILE_NAME});
+
+    // check with a failed job
+    testFailedJob(ABORT_FAILED_FILE_NAME,  
+                  MyOutputFormatWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                                ABORT_KILLED_FILE_NAME});
+
+    // check with a killed job
+    testKilledJob(ABORT_KILLED_FILE_NAME, 
+                  MyOutputFormatWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                                ABORT_FAILED_FILE_NAME});
+  }
+
+  /**
+   * Test if a failed job with custom committer runs the deprecated
+   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
+   * compatibility testing.
+   * @throws Exception 
+   */
+  public void testCustomCleanup() throws Exception {
+    
+    // check with a successful job
+    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
+                      MyOutputFormatWithCustomCleanup.class, 
+                      new String[] {});
+
+    // check with a failed job
+    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  MyOutputFormatWithCustomCleanup.class,
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+    // check with a killed job
+    testKilledJob(CUSTOM_CLEANUP_FILE_NAME,  
+                  MyOutputFormatWithCustomCleanup.class,
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
+}



Mime
View raw message