Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 78712FF06 for ; Wed, 24 Apr 2013 17:39:51 +0000 (UTC) Received: (qmail 78195 invoked by uid 500); 24 Apr 2013 17:39:51 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 78119 invoked by uid 500); 24 Apr 2013 17:39:51 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 78111 invoked by uid 99); 24 Apr 2013 17:39:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Apr 2013 17:39:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Apr 2013 17:39:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 72D592388A68; Wed, 24 Apr 2013 17:39:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1471557 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/... Date: Wed, 24 Apr 2013 17:39:29 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130424173929.72D592388A68@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Wed Apr 24 17:39:28 2013 New Revision: 1471557 URL: http://svn.apache.org/r1471557 Log: Merge -c 1471556 from trunk to branch-2 to fix MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that cleanup is now called even if there is an error. The old mapred API already ensures that Mapper.close and Reducer.close are invoked during error handling. Note that it is an incompatible change, however end-users can override Mapper.run and Reducer.run to get the old (inconsistent) behaviour. Contributed by Arun C. Murthy. Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java - copied unchanged from r1471556, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 24 17:39:28 2013 @@ -180,6 +180,14 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5146. application classloader may be used too early to load classes. (Sangjin Lee via tomwhite) + MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent + with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that + cleanup is now called even if there is an error. The old mapred API + already ensures that Mapper.close and Reducer.close are invoked during + error handling. Note that it is an incompatible change, however end-users + can override Mapper.run and Reducer.run to get the old (inconsistent) + behaviour. (acmurthy) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Apr 24 17:39:28 2013 @@ -434,10 +434,15 @@ public class MapTask extends Task { } statusUpdate(umbilical); collector.flush(); - } finally { - //close - in.close(); // close input + + in.close(); + in = null; + collector.close(); + collector = null; + } finally { + closeQuietly(in); + closeQuietly(collector); } } @@ -753,13 +758,20 @@ public class MapTask extends Task { new WrappedMapper().getMapContext( mapContext); - input.initialize(split, mapperContext); - mapper.run(mapperContext); - mapPhase.complete(); - setPhase(TaskStatus.Phase.SORT); - statusUpdate(umbilical); - input.close(); - output.close(mapperContext); + try { + input.initialize(split, mapperContext); + mapper.run(mapperContext); + mapPhase.complete(); + setPhase(TaskStatus.Phase.SORT); + statusUpdate(umbilical); + input.close(); + input = null; + output.close(mapperContext); + output = null; + } finally { + closeQuietly(input); + closeQuietly(output, mapperContext); + } } class DirectMapOutputCollector @@ -1949,4 +1961,55 @@ public class MapTask extends Task { } } + private + void closeQuietly(RecordReader c) { + if (c != null) { + try { + c.close(); + } catch (IOException ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private + void closeQuietly(MapOutputCollector c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private + void closeQuietly( + org.apache.hadoop.mapreduce.RecordReader c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private + void closeQuietly( + org.apache.hadoop.mapreduce.RecordWriter c, + org.apache.hadoop.mapreduce.Mapper.Context + mapperContext) { + if (c != null) { + try { + c.close(mapperContext); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 24 17:39:28 2013 @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -428,14 +429,15 @@ public class ReduceTask extends Task { // make output collector String finalName = getOutputName(getPartition()); - final RecordWriter out = new OldTrackingRecordWriter( + RecordWriter out = new OldTrackingRecordWriter( this, job, reporter, finalName); - + final RecordWriter finalOut = out; + OutputCollector collector = new OutputCollector() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { - out.write(key, value); + finalOut.write(key, value); // indicate that progress update needs to be sent reporter.progress(); } @@ -466,20 +468,14 @@ public class ReduceTask extends Task { values.informReduceProgress(); } - //Clean up: repeated in catch block below reducer.close(); - out.close(reporter); - //End of clean up. - } catch (IOException ioe) { - try { - reducer.close(); - } catch (IOException ignored) {} - - try { - out.close(reporter); - } catch (IOException ignored) {} + reducer = null; - throw ioe; + out.close(reporter); + out = null; + } finally { + IOUtils.cleanup(LOG, reducer); + closeQuietly(out, reporter); } } @@ -645,7 +641,21 @@ public class ReduceTask extends Task { committer, reporter, comparator, keyClass, valueClass); - reducer.run(reducerContext); - trackedRW.close(reducerContext); + try { + reducer.run(reducerContext); + } finally { + trackedRW.close(reducerContext); + } + } + + private + void closeQuietly(RecordWriter c, Reporter r) { + if (c != null) { + try { + c.close(r); + } catch (Exception e) { + LOG.info("Exception in closing " + c, e); + } + } } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java Wed Apr 24 17:39:28 2013 @@ -140,9 +140,12 @@ public class Mapper iter = context.getValues().iterator(); - if(iter instanceof ReduceContext.ValueIterator) { - ((ReduceContext.ValueIterator)iter).resetBackupStore(); + try { + while (context.nextKey()) { + reduce(context.getCurrentKey(), context.getValues(), context); + // If a back up store is used, reset it + Iterator iter = context.getValues().iterator(); + if(iter instanceof ReduceContext.ValueIterator) { + ((ReduceContext.ValueIterator)iter).resetBackupStore(); + } } + } finally { + cleanup(context); } - cleanup(context); } }