Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 72F03FF12 for ; Wed, 24 Apr 2013 17:41:10 +0000 (UTC) Received: (qmail 80915 invoked by uid 500); 24 Apr 2013 17:41:10 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 80832 invoked by uid 500); 24 Apr 2013 17:41:09 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 80820 invoked by uid 99); 24 Apr 2013 17:41:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Apr 2013 17:41:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Apr 2013 17:41:04 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4EA092388A68 for ; Wed, 24 Apr 2013 17:40:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1471559 - in /hadoop/common/branches/branch-1: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/ Date: Wed, 24 Apr 2013 17:40:41 -0000 To: common-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130424174042.4EA092388A68@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Wed Apr 24 17:40:41 2013 New Revision: 1471559 URL: http://svn.apache.org/r1471559 Log: MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that cleanup is now called even if there is an error. The old mapred API already ensures that Mapper.close and Reducer.close are invoked during error handling. Note that it is an incompatible change, however end-users can override Mapper.run and Reducer.run to get the old (inconsistent) behaviour. Contributed by Arun C. Murthy. Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Mapper.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1471559&r1=1471558&r2=1471559&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Wed Apr 24 17:40:41 2013 @@ -616,6 +616,14 @@ Release 1.2.0 - unreleased HADOOP-9492. Update testConf.xml for HADOOP-9473. (Jing Zhao via szetszwo) + MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent + with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense + that cleanup is now called even if there is an error. The old mapred API + already ensures that Mapper.close and Reducer.close are invoked during + error handling. Note that it is an incompatible change, however end-users + can override Mapper.run and Reducer.run to get the old (inconsistent) + behaviour. (acmurthy) + Release 1.1.2 - 2013.01.30 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1471559&r1=1471558&r2=1471559&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Apr 24 17:40:41 2013 @@ -48,12 +48,11 @@ import org.apache.hadoop.fs.FileSystem.S import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.serializer.Deserializer; @@ -62,13 +61,8 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapreduce.split.JobSplit; -import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; -import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; -import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; @@ -434,10 +428,14 @@ class MapTask extends Task { try { runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); - } finally { - //close - in.close(); // close input + + in.close(); + in = null; collector.close(); + collector = null; + } finally { + closeQuietly(in); + closeQuietly(collector); } } @@ -764,7 +762,9 @@ class MapTask extends Task { input.initialize(split, mapperContext); mapper.run(mapperContext); input.close(); + input = null; output.close(mapperContext); + output = null; } catch (NoSuchMethodException e) { throw new IOException("Can't find Context constructor", e); } catch (InstantiationException e) { @@ -773,6 +773,9 @@ class MapTask extends Task { throw new IOException("Can't invoke Context constructor", e); } catch (IllegalAccessException e) { throw new IOException("Can't invoke Context constructor", e); + } finally { + closeQuietly(input); + closeQuietly(output, mapperContext); } } @@ -1739,5 +1742,59 @@ class MapTask extends Task { super(s); } } + + private + void closeQuietly(RecordReader c) { + if (c != null) { + try { + c.close(); + } catch (IOException ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private + void closeQuietly(MapOutputCollector c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private + void closeQuietly( + org.apache.hadoop.mapreduce.RecordReader c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + + + private + void closeQuietly( + org.apache.hadoop.mapreduce.RecordWriter c, + org.apache.hadoop.mapreduce.Mapper.Context + mapperContext) { + if (c != null) { + try { + c.close(mapperContext); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } } Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1471559&r1=1471558&r2=1471559&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 24 17:40:41 2013 @@ -488,14 +488,16 @@ class ReduceTask extends Task { // make output collector String finalName = getOutputName(getPartition()); - final RecordWriter out = new OldTrackingRecordWriter( - reduceOutputCounter, job, reporter, finalName); + RecordWriter out = + new OldTrackingRecordWriter( + reduceOutputCounter, job, reporter, finalName); + final RecordWriter finalOut = out; OutputCollector collector = new OutputCollector() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { - out.write(key, value); + finalOut.write(key, value); // indicate that progress update needs to be sent reporter.progress(); } @@ -528,18 +530,14 @@ class ReduceTask extends Task { //Clean up: repeated in catch block below reducer.close(); + reducer = null; + out.close(reporter); + out = null; //End of clean up. - } catch (IOException ioe) { - try { - reducer.close(); - } catch (IOException ignored) {} - - try { - out.close(reporter); - } catch (IOException ignored) {} - - throw ioe; + } finally { + IOUtils.cleanup(LOG, reducer); + closeQuietly(out, reporter); } } @@ -647,8 +645,11 @@ class ReduceTask extends Task { trackedRW, committer, reporter, comparator, keyClass, valueClass); - reducer.run(reducerContext); - trackedRW.close(reducerContext); + try { + reducer.run(reducerContext); + } finally { + trackedRW.close(reducerContext); + } } private static enum CopyOutputErrorType { @@ -3011,4 +3012,15 @@ class ReduceTask extends Task { return decompressedSize; } } + + private + void closeQuietly(RecordWriter c, Reporter r) { + if (c != null) { + try { + c.close(r); + } catch (Exception e) { + LOG.info("Exception in closing " + c, e); + } + } + } } Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Mapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Mapper.java?rev=1471559&r1=1471558&r2=1471559&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Mapper.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Mapper.java Wed Apr 24 17:40:41 2013 @@ -140,9 +140,12 @@ public class Mapper { + + /** Map method with different behavior based on the thread id */ + public void map(LongWritable key, Text val, Context c) + throws IOException, InterruptedException { + throw new IOException("TestMapperReducerCleanup"); + } + + protected void cleanup(Context context) + throws IOException, InterruptedException { + mapCleanup = true; + super.cleanup(context); + } + } + + private static class TrackingTokenizerMapper extends TokenizerMapper { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) + throws IOException, InterruptedException { + mapCleanup = true; + super.cleanup(context); + } + + } + + private static class FailingReducer + extends Reducer { + + public void reduce(LongWritable key, Iterable vals, Context context) + throws IOException, InterruptedException { + throw new IOException("TestMapperReducerCleanup"); + } + + protected void cleanup(Context context) + throws IOException, InterruptedException { + reduceCleanup = true; + super.cleanup(context); + } + } + + private static class TrackingIntSumReducer extends IntSumReducer { + + protected void cleanup(Context context) + throws IOException, InterruptedException { + reduceCleanup = true; + super.cleanup(context); + } +} + + public static class TrackingTextInputFormat extends TextInputFormat { + + public static class TrackingRecordReader extends LineRecordReader { + @Override + public synchronized void close() throws IOException { + recordReaderCleanup = true; + super.close(); + } + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) { + return new TrackingRecordReader(); + } + } + + @SuppressWarnings("rawtypes") + public static class TrackingTextOutputFormat extends TextOutputFormat { + + public static class TrackingRecordWriter extends LineRecordWriter { + + public TrackingRecordWriter(DataOutputStream out) { + super(out); + } + + @Override + public synchronized void close(TaskAttemptContext context) + throws IOException { + recordWriterCleanup = true; + super.close(context); + } + + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + + Path file = getDefaultWorkFile(job, ""); + FileSystem fs = file.getFileSystem(conf); + FSDataOutputStream fileOut = fs.create(file, false); + + return new TrackingRecordWriter(fileOut); + } + + } + + + /** + * Create a single input file in the input directory. + * @param dirPath the directory in which the file resides + * @param id the file id number + * @param numRecords how many records to write to each file. + */ + private void createInputFile(Path dirPath, int id, int numRecords) + throws IOException { + final String MESSAGE = "This is a line in a file: "; + + Path filePath = new Path(dirPath, "" + id); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + + OutputStream os = fs.create(filePath); + BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); + + for (int i = 0; i < numRecords; i++) { + w.write(MESSAGE + id + " " + i + "\n"); + } + + w.close(); + } + + private final String INPUT_DIR = "input"; + private final String OUTPUT_DIR = "output"; + + private Path getInputPath() { + String dataDir = System.getProperty("test.build.data"); + if (null == dataDir) { + return new Path(INPUT_DIR); + } else { + return new Path(new Path(dataDir), INPUT_DIR); + } + } + + private Path getOutputPath() { + String dataDir = System.getProperty("test.build.data"); + if (null == dataDir) { + return new Path(OUTPUT_DIR); + } else { + return new Path(new Path(dataDir), OUTPUT_DIR); + } + } + + private Path createInput() throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path inputPath = getInputPath(); + + // Clear the input directory if it exists, first. + if (fs.exists(inputPath)) { + fs.delete(inputPath, true); + } + + // Create an input file + createInputFile(inputPath, 0, 10); + + return inputPath; + } + + @Test + public void testMapCleanup() throws Exception { + reset(); + + Job job = new Job(); + + Path inputPath = createInput(); + Path outputPath = getOutputPath(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + job.setMapperClass(FailingMapper.class); + job.setInputFormatClass(TrackingTextInputFormat.class); + job.setOutputFormatClass(TrackingTextOutputFormat.class); + job.setNumReduceTasks(0); + FileInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + + job.waitForCompletion(true); + + Assert.assertTrue(mapCleanup); + Assert.assertTrue(recordReaderCleanup); + Assert.assertTrue(recordWriterCleanup); + } + + @Test + public void testReduceCleanup() throws Exception { + reset(); + + Job job = new Job(); + + Path inputPath = createInput(); + Path outputPath = getOutputPath(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + job.setMapperClass(TrackingTokenizerMapper.class); + job.setReducerClass(FailingReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setInputFormatClass(TrackingTextInputFormat.class); + job.setOutputFormatClass(TrackingTextOutputFormat.class); + job.setNumReduceTasks(1); + FileInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + + job.waitForCompletion(true); + + Assert.assertTrue(mapCleanup); + Assert.assertTrue(reduceCleanup); + Assert.assertTrue(recordReaderCleanup); + Assert.assertTrue(recordWriterCleanup); + } + + @Test + public void testJobSuccessCleanup() throws Exception { + reset(); + + Job job = new Job(); + + Path inputPath = createInput(); + Path outputPath = getOutputPath(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + job.setMapperClass(TrackingTokenizerMapper.class); + job.setReducerClass(TrackingIntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setInputFormatClass(TrackingTextInputFormat.class); + job.setOutputFormatClass(TrackingTextOutputFormat.class); + job.setNumReduceTasks(1); + FileInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + + job.waitForCompletion(true); + + Assert.assertTrue(mapCleanup); + Assert.assertTrue(reduceCleanup); + Assert.assertTrue(recordReaderCleanup); + Assert.assertTrue(recordWriterCleanup); + } + +}