Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 34596 invoked from network); 19 Mar 2009 01:48:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Mar 2009 01:48:08 -0000 Received: (qmail 96771 invoked by uid 500); 19 Mar 2009 00:01:27 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 96749 invoked by uid 500); 19 Mar 2009 00:01:26 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 96740 invoked by uid 99); 19 Mar 2009 00:01:26 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2009 17:01:26 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Mar 2009 00:01:19 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8F1BF2388870; Thu, 19 Mar 2009 00:00:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r755792 - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/ Date: Thu, 19 Mar 2009 00:00:58 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090319000059.8F1BF2388870@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Thu Mar 19 00:00:58 2009 New Revision: 755792 URL: http://svn.apache.org/viewvc?rev=755792&view=rev Log: HADOOP-5382. Support combiners in the new context object API. (omalley) Modified: hadoop/core/branches/branch-0.20/ (props changed) hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed) hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Propchange: hadoop/core/branches/branch-0.20/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 19 00:00:58 2009 @@ -1,2 +1,2 @@ /hadoop/core/branches/branch-0.19:713112 -/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426 +/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790 Modified: hadoop/core/branches/branch-0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Mar 19 00:00:58 2009 @@ -642,8 +642,9 @@ HADOOP-5255. Fix use of Math.abs to avoid overflow. (Jonathan Ellis via cdouglas) - HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN - or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas) + HADOOP-5269. Fixes a problem to do with tasktracker holding on to + FAILED_UNCLEAN or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu + via ddas) HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare Scheduler accesses the tasktrackers stored by the JobTracker. @@ -651,37 +652,41 @@ HADOOP-5233. Addresses the three issues - Race condition in updating status, NPE in TaskTracker task localization when the conf file is missing - (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235). - (Amareshwari Sriramadasu via ddas) + (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task + (HADOOP-5235). (Amareshwari Sriramadasu via ddas) HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when - a job finishes. This fixes a bunch of problems to do with NPE when a completed - job is not in memory and a tasktracker comes to the jobtracker with a status - report of a task belonging to that job. (Amar Kamat via ddas) - - HADOOP-5282. Fixed job history logs for task attempts that are failed by the - JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth) + a job finishes. This fixes a bunch of problems to do with NPE when a + completed job is not in memory and a tasktracker comes to the jobtracker + with a status report of a task belonging to that job. (Amar Kamat via ddas) + + HADOOP-5282. Fixed job history logs for task attempts that are + failed by the JobTracker, say due to lost task trackers. (Amar + Kamat via yhemanth) HADOOP-4963. Fixes a logging to do with getting the location of map output file. (Amareshwari Sriramadasu via ddas) HADOOP-5292. Fix NPE in KFS::getBlockLocations. (Sriram Rao via lohit) - HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation - formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas) + HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes + the estimation formula linear where blowUp = + Total-Output/Total-Input. (Sharad Agarwal via ddas) HADOOP-5142. Fix MapWritable#putAll to store key/value classes. (Doğacan Güney via enis) - HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort is invoked on - the connector. The workaround patch retries a few times before failing. - (Jothi Padmanabhan via yhemanth) - - HADOOP-5280. Adds a check to prevent a task state transition from FAILED to any of - UNASSIGNED, RUNNING, COMMIT_PENDING or SUCCEEDED. (ddas) - - HADOOP-5272. Fixes a problem to do with detecting whether an attempt is the first - attempt of a Task. This affects JobTracker restart. (Amar Kamat via ddas) + HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort + is invoked on the connector. The workaround patch retries a few + times before failing. (Jothi Padmanabhan via yhemanth) + + HADOOP-5280. Adds a check to prevent a task state transition from + FAILED to any of UNASSIGNED, RUNNING, COMMIT_PENDING or + SUCCEEDED. (ddas) + + HADOOP-5272. Fixes a problem to do with detecting whether an + attempt is the first attempt of a Task. This affects JobTracker + restart. (Amar Kamat via ddas) HADOOP-5306. Fixes a problem to do with logging/parsing the http port of a lost tracker. Affects JobTracker restart. (Amar Kamat via ddas) @@ -690,12 +695,13 @@ HADOOP-5274. Fix gridmix2 dependency on wordcount example. (cdouglas) - HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks. - (hairong) + HADOOP-5145. Balancer sometimes runs out of memory after running + days or weeks. (hairong) - HADOOP-5338. Fix jobtracker restart to clear task completion events cached by - tasktrackers forcing them to fetch all events afresh, thus avoiding missed - task completion events on the tasktrackers. (Amar Kamat via yhemanth) + HADOOP-5338. Fix jobtracker restart to clear task completion + events cached by tasktrackers forcing them to fetch all events + afresh, thus avoiding missed task completion events on the + tasktrackers. (Amar Kamat via yhemanth) HADOOP-4695. Change TestGlobalFilter so that it allows a web page to be filtered more than once for a single access. (Kan Zhang via szetszwo) @@ -716,28 +722,32 @@ HADOOP-5395. Change the exception message when a job is submitted to an invalid queue. (Rahul Kumar Singh via yhemanth) - HADOOP-5276. Fixes a problem to do with updating the start time of a task when - the tracker that ran the task is lost. (Amar Kamat via ddas) + HADOOP-5276. Fixes a problem to do with updating the start time of + a task when the tracker that ran the task is lost. (Amar Kamat via + ddas) - HADOOP-5278. Fixes a problem to do with logging the finish time of a task - during recovery (after a JobTracker restart). (Amar Kamat via ddas) + HADOOP-5278. Fixes a problem to do with logging the finish time of + a task during recovery (after a JobTracker restart). (Amar Kamat + via ddas) - HADOOP-5490. Fixes a synchronization problem in the EagerTaskInitializationListener - class. (Jothi Padmanabhan via ddas) + HADOOP-5490. Fixes a synchronization problem in the + EagerTaskInitializationListener class. (Jothi Padmanabhan via + ddas) - HADOOP-5493. The shuffle copier threads return the codecs back to the pool when the - shuffle completes. (Jothi Padmanabhan via ddas) + HADOOP-5493. The shuffle copier threads return the codecs back to + the pool when the shuffle completes. (Jothi Padmanabhan via ddas) HADOOP-5505. Fix JspHelper initialization in the context of MiniDFSCluster. (Raghu Angadi) - HADOOP-5414. Fixes IO exception while executing hadoop fs -touchz fileName by - making sure that lease renewal thread exits before dfs client exits. - (hairong) - - HADOOP-5103. FileInputFormat now reuses the clusterMap network topology object - and that brings down the log messages in the JobClient to do with - NetworkTopology.add significantly. (Jothi Padmanabhan via ddas) + HADOOP-5414. Fixes IO exception while executing hadoop fs -touchz + fileName by making sure that lease renewal thread exits before dfs + client exits. (hairong) + + HADOOP-5103. FileInputFormat now reuses the clusterMap network + topology object and that brings down the log messages in the + JobClient to do with NetworkTopology.add significantly. (Jothi + Padmanabhan via ddas) HADOOP-5483. Fixes a problem in the Directory Cleanup Thread due to which TestMiniMRWithDFS sometimes used to fail. (ddas) @@ -751,9 +761,11 @@ HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed tasks. (cdouglas) - HADOOP-5516. Fix NullPointerException in TaskMemoryManagerThread that comes when - monitored processes disappear when the thread is running. - (Vinod Kumar Vavilapalli via yhemanth) + HADOOP-5516. Fix NullPointerException in TaskMemoryManagerThread + that comes when monitored processes disappear when the thread is + running. (Vinod Kumar Vavilapalli via yhemanth) + + HADOOP-5382. Support combiners in the new context object API. (omalley) Release 0.19.2 - Unreleased Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 19 00:00:58 2009 @@ -1,3 +1,3 @@ /hadoop/core/branches/branch-0.18/CHANGES.txt:727226 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112 -/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426 +/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790 Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu Mar 19 00:00:58 2009 @@ -312,7 +312,9 @@ void runOldMapper(final JobConf job, final BytesWritable rawSplit, final TaskUmbilicalProtocol umbilical, - TaskReporter reporter) throws IOException { + TaskReporter reporter + ) throws IOException, InterruptedException, + ClassNotFoundException { InputSplit inputSplit = null; // reinstantiate the split try { @@ -429,7 +431,7 @@ NewOutputCollector(JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter - ) throws IOException { + ) throws IOException, ClassNotFoundException { collector = new MapOutputBuffer(umbilical, job, reporter); } @@ -439,8 +441,13 @@ } @Override - public void close(TaskAttemptContext context) throws IOException { - collector.flush(); + public void close(TaskAttemptContext context + ) throws IOException,InterruptedException { + try { + collector.flush(); + } catch (ClassNotFoundException cnf) { + throw new IOException("can't find class ", cnf); + } collector.close(); } } @@ -525,9 +532,10 @@ interface MapOutputCollector extends OutputCollector { - public void close() throws IOException; + public void close() throws IOException, InterruptedException; - public void flush() throws IOException; + public void flush() throws IOException, InterruptedException, + ClassNotFoundException; } @@ -559,7 +567,8 @@ } - public void flush() throws IOException { + public void flush() throws IOException, InterruptedException, + ClassNotFoundException { } public void collect(K key, V value) throws IOException { @@ -582,7 +591,7 @@ private final SerializationFactory serializationFactory; private final Serializer keySerializer; private final Serializer valSerializer; - private final Class combinerClass; + private final CombinerRunner combinerRunner; private final CombineOutputCollector combineCollector; // Compression for map-outputs @@ -627,7 +636,6 @@ private final Counters.Counter mapOutputByteCounter; private final Counters.Counter mapOutputRecordCounter; - private final Counters.Counter combineInputCounter; private final Counters.Counter combineOutputCounter; private ArrayList indexCacheList; @@ -636,7 +644,8 @@ @SuppressWarnings("unchecked") public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, - TaskReporter reporter) throws IOException { + TaskReporter reporter + ) throws IOException, ClassNotFoundException { this.job = job; this.reporter = reporter; localFs = FileSystem.getLocal(job); @@ -688,7 +697,8 @@ // counters mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS); - combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS); + Counters.Counter combineInputCounter = + reporter.getCounter(COMBINE_INPUT_RECORDS); combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS); // compression if (job.getCompressMapOutput()) { @@ -697,10 +707,14 @@ codec = ReflectionUtils.newInstance(codecClass, job); } // combiner - combinerClass = job.getCombinerClass(); - combineCollector = (null != combinerClass) - ? new CombineOutputCollector(combineOutputCounter) - : null; + combinerRunner = CombinerRunner.create(job, getTaskID(), + combineInputCounter, + reporter, null); + if (combinerRunner != null) { + combineCollector= new CombineOutputCollector(combineOutputCounter); + } else { + combineCollector = null; + } minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3); spillThread.setDaemon(true); spillThread.setName("SpillThread"); @@ -995,7 +1009,8 @@ } } - public synchronized void flush() throws IOException { + public synchronized void flush() throws IOException, ClassNotFoundException, + InterruptedException { LOG.info("Starting flush of map output"); spillLock.lock(); try { @@ -1085,7 +1100,8 @@ spillReady.signal(); } - private void sortAndSpill() throws IOException { + private void sortAndSpill() throws IOException, ClassNotFoundException, + InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart @@ -1113,7 +1129,7 @@ long segmentStart = out.getPos(); writer = new Writer(job, out, keyClass, valClass, codec, spilledRecordsCounter); - if (null == combinerClass) { + if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && @@ -1140,7 +1156,7 @@ combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); - combineAndSpill(kvIter, combineInputCounter); + combinerRunner.combine(kvIter, combineCollector); } } @@ -1257,25 +1273,6 @@ vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen); } - @SuppressWarnings("unchecked") - private void combineAndSpill(RawKeyValueIterator kvIter, - Counters.Counter inCounter) throws IOException { - Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); - try { - CombineValuesIterator values = new CombineValuesIterator( - kvIter, comparator, keyClass, valClass, job, reporter, - inCounter); - while (values.more()) { - combiner.reduce(values.getKey(), values, combineCollector, reporter); - values.nextKey(); - // indicate we're making progress - reporter.progress(); - } - } finally { - combiner.close(); - } - } - /** * Inner class wrapping valuebytes, used for appendRaw. */ @@ -1329,7 +1326,8 @@ public void close() { } } - private void mergeParts() throws IOException { + private void mergeParts() throws IOException, InterruptedException, + ClassNotFoundException { // get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; @@ -1428,11 +1426,11 @@ Writer writer = new Writer(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); - if (null == combinerClass || numSpills < minSpillsForCombine) { + if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); - combineAndSpill(kvIter, combineInputCounter); + combinerRunner.combine(kvIter, combineCollector); } //close Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Mar 19 00:00:58 2009 @@ -119,8 +119,6 @@ getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS); private Counters.Counter reduceOutputCounter = getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS); - private Counters.Counter reduceCombineInputCounter = - getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS); private Counters.Counter reduceCombineOutputCounter = getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS); @@ -518,7 +516,7 @@ private void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, - final Reporter reporter, + final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, @@ -536,39 +534,14 @@ (org.apache.hadoop.mapreduce.RecordWriter) outputFormat.getRecordWriter(taskContext); job.setBoolean("mapred.skip.on", isSkipping()); - org.apache.hadoop.mapreduce.Reducer.Context - reducerContext = null; - try { - Constructor contextConstructor = - org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor - (new Class[]{org.apache.hadoop.mapreduce.Reducer.class, - Configuration.class, - org.apache.hadoop.mapreduce.TaskAttemptID.class, - RawKeyValueIterator.class, - org.apache.hadoop.mapreduce.RecordWriter.class, - org.apache.hadoop.mapreduce.OutputCommitter.class, - org.apache.hadoop.mapreduce.StatusReporter.class, - RawComparator.class, - Class.class, - Class.class}); - - reducerContext = contextConstructor.newInstance(reducer, job, - getTaskID(), - rIter, output, committer, - reporter, comparator, - keyClass, valueClass); - - reducer.run(reducerContext); - output.close(reducerContext); - } catch (NoSuchMethodException e) { - throw new IOException("Can't find Context constructor", e); - } catch (InstantiationException e) { - throw new IOException("Can't create Context", e); - } catch (InvocationTargetException e) { - throw new IOException("Can't invoke Context constructor", e); - } catch (IllegalAccessException e) { - throw new IOException("Can't invoke Context constructor", e); - } + org.apache.hadoop.mapreduce.Reducer.Context + reducerContext = createReduceContext(reducer, job, getTaskID(), + rIter, reduceInputValueCounter, + output, committer, + reporter, comparator, keyClass, + valueClass); + reducer.run(reducerContext); + output.close(reducerContext); } class ReduceCopier implements MRConstants { @@ -722,14 +695,14 @@ private volatile int maxFetchRetriesPerMap; /** - * Combiner class to run during in-memory merge, if defined. + * Combiner runner, if a combiner is needed */ - private final Class combinerClass; + private CombinerRunner combinerRunner; /** * Resettable collector used for combine. */ - private final CombineOutputCollector combineCollector; + private CombineOutputCollector combineCollector = null; /** * Maximum percent of failed fetch attempt before killing the reduce task. @@ -1680,7 +1653,8 @@ } public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf, - TaskReporter reporter)throws IOException { + TaskReporter reporter + )throws ClassNotFoundException, IOException { configureClasspath(conf); this.reporter = reporter; @@ -1693,10 +1667,15 @@ this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); this.maxInFlight = 4 * numCopiers; this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300); - this.combinerClass = conf.getCombinerClass(); - combineCollector = (null != combinerClass) - ? new CombineOutputCollector(reduceCombineOutputCounter) - : null; + Counters.Counter combineInputCounter = + reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS); + this.combinerRunner = CombinerRunner.create(conf, getTaskID(), + combineInputCounter, + reporter, null); + if (combinerRunner != null) { + combineCollector = + new CombineOutputCollector(reduceCombineOutputCounter); + } this.ioSortFactor = conf.getInt("io.sort.factor", 10); // the exponential backoff formula @@ -2507,11 +2486,11 @@ conf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null); - if (null == combinerClass) { + if (combinerRunner == null) { Merger.writeFile(rIter, writer, reporter, conf); } else { combineCollector.setWriter(writer); - combineAndSpill(rIter, reduceCombineInputCounter); + combinerRunner.combine(rIter, combineCollector); } writer.close(); @@ -2536,29 +2515,6 @@ } } - @SuppressWarnings("unchecked") - private void combineAndSpill( - RawKeyValueIterator kvIter, - Counters.Counter inCounter) throws IOException { - JobConf job = (JobConf)getConf(); - Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); - Class keyClass = job.getMapOutputKeyClass(); - Class valClass = job.getMapOutputValueClass(); - RawComparator comparator = job.getOutputKeyComparator(); - try { - CombineValuesIterator values = new CombineValuesIterator( - kvIter, comparator, keyClass, valClass, job, Reporter.NULL, - inCounter); - while (values.more()) { - combiner.reduce(values.getKey(), values, combineCollector, - Reporter.NULL); - values.nextKey(); - } - } finally { - combiner.close(); - } - } - private class GetMapEventsThread extends Thread { private IntWritable fromEventId = new IntWritable(0); Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Thu Mar 19 00:00:58 2009 @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.text.NumberFormat; import java.util.HashMap; import java.util.Iterator; @@ -41,6 +43,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progress; @@ -986,4 +989,208 @@ } } + private static final Constructor + contextConstructor; + static { + try { + contextConstructor = + org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor + (new Class[]{org.apache.hadoop.mapreduce.Reducer.class, + Configuration.class, + org.apache.hadoop.mapreduce.TaskAttemptID.class, + RawKeyValueIterator.class, + org.apache.hadoop.mapreduce.Counter.class, + org.apache.hadoop.mapreduce.RecordWriter.class, + org.apache.hadoop.mapreduce.OutputCommitter.class, + org.apache.hadoop.mapreduce.StatusReporter.class, + RawComparator.class, + Class.class, + Class.class}); + } catch (NoSuchMethodException nme) { + throw new IllegalArgumentException("Can't find constructor"); + } + } + + @SuppressWarnings("unchecked") + protected static + org.apache.hadoop.mapreduce.Reducer.Context + createReduceContext(org.apache.hadoop.mapreduce.Reducer + reducer, + Configuration job, + org.apache.hadoop.mapreduce.TaskAttemptID taskId, + RawKeyValueIterator rIter, + org.apache.hadoop.mapreduce.Counter inputCounter, + org.apache.hadoop.mapreduce.RecordWriter output, + org.apache.hadoop.mapreduce.OutputCommitter committer, + org.apache.hadoop.mapreduce.StatusReporter reporter, + RawComparator comparator, + Class keyClass, Class valueClass + ) throws IOException, ClassNotFoundException { + try { + + return contextConstructor.newInstance(reducer, job, taskId, + rIter, inputCounter, output, + committer, reporter, comparator, + keyClass, valueClass); + } catch (InstantiationException e) { + throw new IOException("Can't create Context", e); + } catch (InvocationTargetException e) { + throw new IOException("Can't invoke Context constructor", e); + } catch (IllegalAccessException e) { + throw new IOException("Can't invoke Context constructor", e); + } + } + + protected static abstract class CombinerRunner { + protected final Counters.Counter inputCounter; + protected final JobConf job; + protected final TaskReporter reporter; + + CombinerRunner(Counters.Counter inputCounter, + JobConf job, + TaskReporter reporter) { + this.inputCounter = inputCounter; + this.job = job; + this.reporter = reporter; + } + + /** + * Run the combiner over a set of inputs. + * @param iterator the key/value pairs to use as input + * @param collector the output collector + */ + abstract void combine(RawKeyValueIterator iterator, + OutputCollector collector + ) throws IOException, InterruptedException, + ClassNotFoundException; + + static + CombinerRunner create(JobConf job, + TaskAttemptID taskId, + Counters.Counter inputCounter, + TaskReporter reporter, + org.apache.hadoop.mapreduce.OutputCommitter committer + ) throws ClassNotFoundException { + Class> cls = + (Class>) job.getCombinerClass(); + if (cls != null) { + return new OldCombinerRunner(cls, job, inputCounter, reporter); + } + // make a task context so we can get the classes + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = + new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId); + Class> newcls = + (Class>) + taskContext.getCombinerClass(); + if (newcls != null) { + return new NewCombinerRunner(newcls, job, taskId, taskContext, + inputCounter, reporter, committer); + } + + return null; + } + } + + protected static class OldCombinerRunner extends CombinerRunner { + private final Class> combinerClass; + private final Class keyClass; + private final Class valueClass; + private final RawComparator comparator; + + protected OldCombinerRunner(Class> cls, + JobConf conf, + Counters.Counter inputCounter, + TaskReporter reporter) { + super(inputCounter, conf, reporter); + combinerClass = cls; + keyClass = (Class) job.getMapOutputKeyClass(); + valueClass = (Class) job.getMapOutputValueClass(); + comparator = (RawComparator) job.getOutputKeyComparator(); + } + + @SuppressWarnings("unchecked") + protected void combine(RawKeyValueIterator kvIter, + OutputCollector combineCollector + ) throws IOException { + Reducer combiner = + ReflectionUtils.newInstance(combinerClass, job); + try { + CombineValuesIterator values = + new CombineValuesIterator(kvIter, comparator, keyClass, + valueClass, job, Reporter.NULL, + inputCounter); + while (values.more()) { + combiner.reduce(values.getKey(), values, combineCollector, + Reporter.NULL); + values.nextKey(); + } + } finally { + combiner.close(); + } + } + } + + protected static class NewCombinerRunner extends CombinerRunner { + private final Class> + reducerClass; + private final org.apache.hadoop.mapreduce.TaskAttemptID taskId; + private final RawComparator comparator; + private final Class keyClass; + private final Class valueClass; + private final org.apache.hadoop.mapreduce.OutputCommitter committer; + + NewCombinerRunner(Class reducerClass, + JobConf job, + org.apache.hadoop.mapreduce.TaskAttemptID taskId, + org.apache.hadoop.mapreduce.TaskAttemptContext context, + Counters.Counter inputCounter, + TaskReporter reporter, + org.apache.hadoop.mapreduce.OutputCommitter committer) { + super(inputCounter, job, reporter); + this.reducerClass = reducerClass; + this.taskId = taskId; + keyClass = (Class) context.getMapOutputKeyClass(); + valueClass = (Class) context.getMapOutputValueClass(); + comparator = (RawComparator) context.getSortComparator(); + this.committer = committer; + } + + private static class OutputConverter + extends org.apache.hadoop.mapreduce.RecordWriter { + OutputCollector output; + OutputConverter(OutputCollector output) { + this.output = output; + } + + @Override + public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context){ + } + + @Override + public void write(K key, V value + ) throws IOException, InterruptedException { + output.collect(key,value); + } + } + + @Override + void combine(RawKeyValueIterator iterator, + OutputCollector collector + ) throws IOException, InterruptedException, + ClassNotFoundException { + // make a reducer + org.apache.hadoop.mapreduce.Reducer reducer = + (org.apache.hadoop.mapreduce.Reducer) + ReflectionUtils.newInstance(reducerClass, job); + org.apache.hadoop.mapreduce.Reducer.Context + reducerContext = createReduceContext(reducer, job, taskId, + iterator, inputCounter, + new OutputConverter(collector), + committer, + reporter, comparator, keyClass, + valueClass); + reducer.run(reducerContext); + } + + } } Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java Thu Mar 19 00:00:58 2009 @@ -365,7 +365,7 @@ * @return the counters for this job. * @throws IOException */ - public Iterable getCounters() throws IOException { + public Counters getCounters() throws IOException { ensureState(JobState.RUNNING); return new Counters(info.getCounters()); } @@ -385,7 +385,6 @@ int numReduces = conf.getNumReduceTasks(); String oldMapperClass = "mapred.mapper.class"; String oldReduceClass = "mapred.reducer.class"; - String oldCombineClass = "mapred.combiner.class"; conf.setBooleanIfUnset("mapred.mapper.new-api", conf.get(oldMapperClass) == null); if (conf.getUseNewMapper()) { @@ -393,7 +392,6 @@ ensureNotSet("mapred.input.format.class", mode); ensureNotSet(oldMapperClass, mode); if (numReduces != 0) { - ensureNotSet(oldCombineClass, mode); ensureNotSet("mapred.partitioner.class", mode); } else { ensureNotSet("mapred.output.format.class", mode); @@ -403,7 +401,6 @@ ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode); ensureNotSet(JobContext.MAP_CLASS_ATTR, mode); if (numReduces != 0) { - ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode); ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode); } else { ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode); @@ -416,12 +413,10 @@ String mode = "new reduce API"; ensureNotSet("mapred.output.format.class", mode); ensureNotSet(oldReduceClass, mode); - ensureNotSet(oldCombineClass, mode); } else { String mode = "reduce compatability"; ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode); ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode); - ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode); } } } Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Thu Mar 19 00:00:58 2009 @@ -41,6 +41,7 @@ public class ReduceContext extends TaskInputOutputContext { private RawKeyValueIterator input; + private Counter inputCounter; private RawComparator comparator; private KEYIN key; // current key private VALUEIN value; // current value @@ -56,6 +57,7 @@ public ReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, + Counter inputCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, @@ -65,6 +67,7 @@ ) throws InterruptedException, IOException{ super(conf, taskid, output, committer, reporter); this.input = input; + this.inputCounter = inputCounter; this.comparator = comparator; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); @@ -106,6 +109,7 @@ buffer.reset(next.getData(), next.getPosition(), next.getLength()); value = valueDeserializer.deserialize(value); hasMore = input.next(); + inputCounter.increment(1); if (hasMore) { next = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Thu Mar 19 00:00:58 2009 @@ -121,6 +121,7 @@ extends ReduceContext { public Context(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, + Counter inputCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, @@ -128,8 +129,8 @@ Class keyClass, Class valueClass ) throws IOException, InterruptedException { - super(conf, taskid, input, output, committer, reporter, comparator, - keyClass, valueClass); + super(conf, taskid, input, inputCounter, output, committer, reporter, + comparator, keyClass, valueClass); } } Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=755792&r1=755791&r2=755792&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original) +++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Thu Mar 19 00:00:58 2009 @@ -96,9 +96,10 @@ ) throws IOException, InterruptedException, ClassNotFoundException { + final String COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true); localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true); - writeFile("in/part1", "this is a test\nof word count\n"); + writeFile("in/part1", "this is a test\nof word count test\ntest\n"); writeFile("in/part2", "more test"); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); @@ -112,8 +113,21 @@ assertTrue(job.waitForCompletion()); String out = readFile("out/part-r-00000"); System.out.println(out); - assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n", + assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n", out); + Counters ctrs = job.getCounters(); + System.out.println("Counters: " + ctrs); + long combineIn = ctrs.findCounter(COUNTER_GROUP, + "COMBINE_INPUT_RECORDS").getValue(); + long combineOut = ctrs.findCounter(COUNTER_GROUP, + "COMBINE_OUTPUT_RECORDS").getValue(); + long reduceIn = ctrs.findCounter(COUNTER_GROUP, + "REDUCE_INPUT_RECORDS").getValue(); + long mapOut = ctrs.findCounter(COUNTER_GROUP, + "MAP_OUTPUT_RECORDS").getValue(); + assertEquals("map out = combine in", mapOut, combineIn); + assertEquals("combine out = reduce in", combineOut, reduceIn); + assertTrue("combine in > combine out", combineIn > combineOut); } private void runSecondarySort(Configuration conf) throws IOException,