hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r936042 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/lib/output/
Date Tue, 20 Apr 2010 19:38:16 GMT
Author: cdouglas
Date: Tue Apr 20 19:38:15 2010
New Revision: 936042

URL: http://svn.apache.org/viewvc?rev=936042&view=rev
Log:
MAPREDUCE-1409. IOExceptions thrown from FIleOutputCommitter::abortTask
should cause the task to fail. Contributed by Amareshwari Sriramadasu

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=936042&r1=936041&r2=936042&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Apr 20 19:38:15 2010
@@ -540,6 +540,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1062. Fix ReliabilityTest to work with retired jobs. (Sreekanth
     Ramakrishnan via cdouglas)
 
+    MAPREDUCE-1409. IOExceptions thrown from FIleOutputCommitter::abortTask
+    should cause the task to fail. (Amareshwari Sriramadasu via cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=936042&r1=936041&r2=936042&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue
Apr 20 19:38:15 2010
@@ -26,8 +26,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.StringUtils;
 
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 
@@ -176,14 +174,10 @@ public class FileOutputCommitter extends
 
   public void abortTask(TaskAttemptContext context) throws IOException {
     Path taskOutputPath =  getTempTaskOutputPath(context);
-    try {
-      if (taskOutputPath != null) {
-        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-        context.getProgressible().progress();
-        fs.delete(taskOutputPath, true);
-      }
-    } catch (IOException ie) {
-      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
+    if (taskOutputPath != null) {
+      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+      context.getProgressible().progress();
+      fs.delete(taskOutputPath, true);
     }
   }
 
@@ -204,38 +198,29 @@ public class FileOutputCommitter extends
 
   public boolean needsTaskCommit(TaskAttemptContext context) 
   throws IOException {
-    try {
-      Path taskOutputPath = getTempTaskOutputPath(context);
-      if (taskOutputPath != null) {
-        context.getProgressible().progress();
-        // Get the file-system for the task output directory
-        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-        // since task output path is created on demand, 
-        // if it exists, task needs a commit
-        if (fs.exists(taskOutputPath)) {
-          return true;
-        }
+    Path taskOutputPath = getTempTaskOutputPath(context);
+    if (taskOutputPath != null) {
+      context.getProgressible().progress();
+      // Get the file-system for the task output directory
+      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+      // since task output path is created on demand, 
+      // if it exists, task needs a commit
+      if (fs.exists(taskOutputPath)) {
+        return true;
       }
-    } catch (IOException  ioe) {
-      throw ioe;
     }
     return false;
   }
 
-  Path getTempTaskOutputPath(TaskAttemptContext taskContext) {
+  Path getTempTaskOutputPath(TaskAttemptContext taskContext) throws IOException {
     JobConf conf = taskContext.getJobConf();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
     if (outputPath != null) {
       Path p = new Path(outputPath,
                      (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
                       "_" + taskContext.getTaskAttemptID().toString()));
-      try {
-        FileSystem fs = p.getFileSystem(conf);
-        return p.makeQualified(fs);
-      } catch (IOException ie) {
-        LOG.warn(StringUtils .stringifyException(ie));
-        return p;
-      }
+      FileSystem fs = p.getFileSystem(conf);
+      return p.makeQualified(fs);
     }
     return null;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=936042&r1=936041&r2=936042&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
Tue Apr 20 19:38:15 2010
@@ -214,16 +214,13 @@ public class FileOutputCommitter extends
 
   /**
    * Delete the work directory
+   * @throws IOException 
    */
   @Override
-  public void abortTask(TaskAttemptContext context) {
-    try {
-      if (workPath != null) { 
-        context.progress();
-        outputFileSystem.delete(workPath, true);
-      }
-    } catch (IOException ie) {
-      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    if (workPath != null) { 
+      context.progress();
+      outputFileSystem.delete(workPath, true);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=936042&r1=936041&r2=936042&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
Tue Apr 20 19:38:15 2010
@@ -19,27 +19,57 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
+import java.net.URI;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.JobStatus;
 
 public class TestFileOutputCommitter extends TestCase {
   private static Path outDir = new Path(
-     System.getProperty("test.build.data", "."), "output");
+     System.getProperty("test.build.data", "/tmp"), "output");
 
   // A random task attempt id for testing.
   private static String attempt = "attempt_200707121733_0001_m_000000_0";
   private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
-
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+  
   @SuppressWarnings("unchecked")
-  public void testCommitter() throws Exception {
-    JobConf job = new JobConf();
+  private void writeOutput(RecordWriter theRecordWriter, Reporter reporter)
+      throws IOException {
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(reporter);
+    }
+  }
+  
+  private void setConfForFileOutputCommitter(JobConf job) {
     job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     job.setOutputCommitter(FileOutputCommitter.class);
     FileOutputFormat.setOutputPath(job, outDir);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testCommitter() throws Exception {
+    JobConf job = new JobConf();
+    setConfForFileOutputCommitter(job);
     JobContext jContext = new JobContextImpl(job, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
@@ -52,31 +82,18 @@ public class TestFileOutputCommitter ext
 
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
+    // write output
     FileSystem localFs = FileSystem.getLocal(job);
     TextOutputFormat theOutputFormat = new TextOutputFormat();
     RecordWriter theRecordWriter =
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
-    Text key1 = new Text("key1");
-    Text key2 = new Text("key2");
-    Text val1 = new Text("val1");
-    Text val2 = new Text("val2");
-    NullWritable nullWritable = NullWritable.get();
+    writeOutput(theRecordWriter, reporter);
 
-    try {
-      theRecordWriter.write(key1, val1);
-      theRecordWriter.write(null, nullWritable);
-      theRecordWriter.write(null, val1);
-      theRecordWriter.write(nullWritable, val2);
-      theRecordWriter.write(key2, nullWritable);
-      theRecordWriter.write(key1, null);
-      theRecordWriter.write(null, null);
-      theRecordWriter.write(key2, val2);
-    } finally {
-      theRecordWriter.close(reporter);
-    }
+    // do commit
     committer.commitTask(tContext);
     committer.commitJob(jContext);
     
+    // validate output
     File expectedFile = new File(new Path(outDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
@@ -87,9 +104,113 @@ public class TestFileOutputCommitter ext
     expectedOutput.append(key2).append('\t').append(val2).append("\n");
     String output = UtilsForTests.slurp(expectedFile);
     assertEquals(output, expectedOutput.toString());
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testAbort() throws IOException {
+    JobConf job = new JobConf();
+    setConfForFileOutputCommitter(job);
+    JobContext jContext = new JobContextImpl(job, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+    FileOutputFormat.setWorkOutputPath(job, committer
+        .getTempTaskOutputPath(tContext));
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+    String file = "test.txt";
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    // write output
+    FileSystem localFs = FileSystem.getLocal(job);
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
+        job, file, reporter);
+    writeOutput(theRecordWriter, reporter);
+
+    // do abort
+    committer.abortTask(tContext);
+    File expectedFile = new File(new Path(committer
+        .getTempTaskOutputPath(tContext), file).toString());
+    assertFalse("task temp dir still exists", expectedFile.exists());
+
+    committer.abortJob(jContext, JobStatus.State.FAILED);
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+        .toString());
+    assertFalse("job temp dir still exists", expectedFile.exists());
+    assertEquals("Output directory not empty", 0, new File(outDir.toString())
+        .listFiles().length);
+    FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
-  public static void main(String[] args) throws Exception {
-    new TestFileOutputCommitter().testCommitter();
+  public static class FakeFileSystem extends RawLocalFileSystem {
+    public FakeFileSystem() {
+      super();
+    }
+
+    public URI getUri() {
+      return URI.create("faildel:///");
+    }
+
+    @Override
+    public boolean delete(Path p, boolean recursive) throws IOException {
+      throw new IOException("fake delete failed");
+    }
+  }
+
+  public void testFailAbort() throws IOException {
+    JobConf job = new JobConf();
+    job.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
+    job.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
+    setConfForFileOutputCommitter(job);
+    JobContext jContext = new JobContextImpl(job, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+    FileOutputFormat.setWorkOutputPath(job, committer
+        .getTempTaskOutputPath(tContext));
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+    String file = "test.txt";
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    // write output
+    FileSystem localFs = new FakeFileSystem();
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
+        job, file, reporter);
+    writeOutput(theRecordWriter, reporter);
+
+    // do abort
+    Throwable th = null;
+    try {
+      committer.abortTask(tContext);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    File jobTmpDir = new File(new Path(outDir,
+        FileOutputCommitter.TEMP_DIR_NAME).toString());
+    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+    File expectedFile = new File(taskTmpDir, file);
+    assertTrue(expectedFile + " does not exists", expectedFile.exists());
+
+    th = null;
+    try {
+      committer.abortJob(jContext, JobStatus.State.FAILED);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    assertTrue("job temp dir does not exists", jobTmpDir.exists());
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=936042&r1=936041&r2=936042&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue
Apr 20 19:38:15 2010
@@ -156,7 +156,7 @@ public class UtilsForTests {
     }
   }
 
-  static String slurp(File f) throws IOException {
+  public static String slurp(File f) throws IOException {
     int len = (int) f.length();
     byte[] buf = new byte[len];
     FileInputStream in = new FileInputStream(f);

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=936042&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Tue Apr 20 19:38:15 2010
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.*;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public class TestFileOutputCommitter extends TestCase {
+  private static Path outDir = new Path(System.getProperty("test.build.data",
+      "/tmp"), "output");
+
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+  private static String partFile = "part-m-00000";
+  private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+
+  @SuppressWarnings("unchecked")
+  private void writeOutput(RecordWriter theRecordWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(context);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testCommitter() throws Exception {
+    Job job = new Job();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    File expectedFile = new File(new Path(outDir, partFile).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = UtilsForTests.slurp(expectedFile);
+    assertEquals(output, expectedOutput.toString());
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testAbort() throws IOException, InterruptedException {
+    Job job = new Job();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do abort
+    committer.abortTask(tContext);
+    File expectedFile = new File(new Path(committer.getWorkPath(), partFile)
+        .toString());
+    assertFalse("task temp dir still exists", expectedFile.exists());
+
+    committer.abortJob(jContext, JobStatus.State.FAILED);
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+        .toString());
+    assertFalse("job temp dir still exists", expectedFile.exists());
+    assertEquals("Output directory not empty", 0, new File(outDir.toString())
+        .listFiles().length);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public static class FakeFileSystem extends RawLocalFileSystem {
+    public FakeFileSystem() {
+      super();
+    }
+
+    public URI getUri() {
+      return URI.create("faildel:///");
+    }
+
+    @Override
+    public boolean delete(Path p, boolean recursive) throws IOException {
+      throw new IOException("fake delete failed");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testFailAbort() throws IOException, InterruptedException {
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
+    conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    FileOutputFormat.setOutputPath(job, outDir);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+    RecordWriter<?, ?> theRecordWriter = theOutputFormat
+        .getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do abort
+    Throwable th = null;
+    try {
+      committer.abortTask(tContext);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    File jobTmpDir = new File(new Path(outDir,
+        FileOutputCommitter.TEMP_DIR_NAME).toString());
+    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+    File expectedFile = new File(taskTmpDir, partFile);
+    assertTrue(expectedFile + " does not exists", expectedFile.exists());
+
+    th = null;
+    try {
+      committer.abortJob(jContext, JobStatus.State.FAILED);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    assertTrue("job temp dir does not exists", jobTmpDir.exists());
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+}



Mime
View raw message