hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r802999 - in /hadoop/common/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/Task.java src/test/org/apache/hadoop/mapred/TestTaskCommit.java
Date Tue, 11 Aug 2009 05:51:55 GMT
Author: ddas
Date: Tue Aug 11 05:51:54 2009
New Revision: 802999

URL: http://svn.apache.org/viewvc?rev=802999&view=rev
Log:
MAPREDUCE-838. Fixes a problem in the way commit of task outputs happens. The bug was that
even if commit failed, the task would be declared as successful. Contributed by Amareshwari
Sriramadasu.

Added:
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=802999&r1=802998&r2=802999&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Aug 11 05:51:54 2009
@@ -201,6 +201,10 @@
 
     HDFS-525. The SimpleDateFormat object in ListPathsServlet is not thread
     safe.  (Suresh Srinivas via szetszwo)
+
+    MAPREDUCE-838. Fixes a problem in the way commit of task outputs
+    happens. The bug was that even if commit failed, the task would
+    be declared as successful. (Amareshwari Sriramadasu via ddas)
  
 Release 0.20.0 - 2009-04-15
 

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=802999&r1=802998&r2=802999&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Tue Aug
11 05:51:54 2009
@@ -742,27 +742,30 @@
           }
           reporter.setProgressFlag();
         }
-        // task can Commit now  
-        try {
-          LOG.info("Task " + taskId + " is allowed to commit now");
-          committer.commitTask(taskContext);
-          return;
-        } catch (IOException iee) {
-          LOG.warn("Failure committing: " + 
-                    StringUtils.stringifyException(iee));
-          discardOutput(taskContext);
-          throw iee;
-        }
+        break;
       } catch (IOException ie) {
         LOG.warn("Failure asking whether task can commit: " + 
             StringUtils.stringifyException(ie));
         if (--retries == 0) {
-          //if it couldn't commit a successfully then delete the output
+          //if it couldn't query successfully then delete the output
           discardOutput(taskContext);
           System.exit(68);
         }
       }
     }
+    
+    // task can Commit now  
+    try {
+      LOG.info("Task " + taskId + " is allowed to commit now");
+      committer.commitTask(taskContext);
+      return;
+    } catch (IOException iee) {
+      LOG.warn("Failure committing: " + 
+        StringUtils.stringifyException(iee));
+      //if it couldn't commit a successfully then delete the output
+      discardOutput(taskContext);
+      throw iee;
+    }
   }
 
   private 

Added: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskCommit.java?rev=802999&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
(added)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
Tue Aug 11 05:51:54 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestTaskCommit extends HadoopTestCase {
+
+  static class CommitterWithCommitFail extends FileOutputCommitter {
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      Path taskOutputPath = getTempTaskOutputPath(context);
+      TaskAttemptID attemptId = context.getTaskAttemptID();
+      JobConf job = context.getJobConf();
+      if (taskOutputPath != null) {
+        FileSystem fs = taskOutputPath.getFileSystem(job);
+        if (fs.exists(taskOutputPath)) {
+          throw new IOException();
+        }
+      }
+    }
+  }
+
+  public TestTaskCommit() throws IOException {
+    super(LOCAL_MR, LOCAL_FS, 1, 1);
+  }
+  
+  public void testCommitFail() throws IOException {
+    Path rootDir = 
+      new Path(System.getProperty("test.build.data",  "/tmp"), "test");
+    final Path inDir = new Path(rootDir, "input");
+    final Path outDir = new Path(rootDir, "output");
+    JobConf jobConf = createJobConf();
+    jobConf.setMaxMapAttempts(1);
+    jobConf.setOutputCommitter(CommitterWithCommitFail.class);
+    RunningJob rJob = UtilsForTests.runJob(jobConf, inDir, outDir, 1, 0);
+    rJob.waitForCompletion();
+    assertEquals(JobStatus.FAILED, rJob.getJobState());
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestTaskCommit td = new TestTaskCommit();
+    td.testCommitFail();
+  }
+}



Mime
View raw message