Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 015EA10A33 for ; Fri, 20 Sep 2013 22:47:06 +0000 (UTC) Received: (qmail 22765 invoked by uid 500); 20 Sep 2013 22:45:50 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 22488 invoked by uid 500); 20 Sep 2013 22:44:50 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 22480 invoked by uid 99); 20 Sep 2013 22:44:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 22:44:49 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 20 Sep 2013 22:44:46 +0000 Received: (qmail 22395 invoked by uid 99); 20 Sep 2013 22:44:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 22:44:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B641C8AF8D0; Fri, 20 Sep 2013 22:44:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: <5c2f6f509b3c46d0a0b9dcc633347606@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-433. Change Combiner to work with new APIs (part of TEZ-398). (sseth) Date: Fri, 20 Sep 2013 22:44:21 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/TEZ-398 85a9d46e3 -> eb0f6ffe5 TEZ-433. Change Combiner to work with new APIs (part of TEZ-398). (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/eb0f6ffe Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/eb0f6ffe Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/eb0f6ffe Branch: refs/heads/TEZ-398 Commit: eb0f6ffe5250b497f35909f8559682cbaa3621ae Parents: 85a9d46 Author: Siddharth Seth Authored: Fri Sep 20 15:43:56 2013 -0700 Committer: Siddharth Seth Committed: Fri Sep 20 15:43:56 2013 -0700 ---------------------------------------------------------------------- .../org/apache/tez/common/TezJobConfig.java | 5 + .../apache/tez/engine/common/ConfigUtils.java | 4 + .../tez/engine/common/TezEngineUtils.java | 39 +++ .../tez/engine/common/ValuesIterator.java | 2 + .../tez/engine/common/combine/Combiner.java | 43 ++++ .../common/shuffle/impl/MergeManager.java | 41 +--- .../tez/engine/common/shuffle/impl/Shuffle.java | 10 +- .../engine/common/sort/impl/ExternalSorter.java | 45 +--- .../common/sort/impl/PipelinedSorter.java | 15 +- .../common/sort/impl/dflt/DefaultSorter.java | 4 +- .../engine/common/task/impl/ValuesIterator.java | 9 +- .../engine/lib/input/ShuffledMergedInput.java | 1 - .../apache/tez/mapreduce/hadoop/MRHelpers.java | 13 + .../hadoop/MultiStageMRConfToTezTranslator.java | 19 +- .../tez/mapreduce/newcombine/MRCombiner.java | 242 +++++++++++++++++++ .../mapreduce/newpartition/MRPartitioner.java | 3 +- .../mapreduce/newprocessor/MRTaskReporter.java | 7 + 17 files changed, 411 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java index 12c2b4b..7c4540c 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java @@ -128,6 +128,11 @@ public class TezJobConfig { */ public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class"; + /** + * Specifies a combiner class (primarily for Shuffle) + */ + public static final String TEZ_ENGINE_COMBINER_CLASS = "tez.engine.combiner.class"; + public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions"; /** http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java index a92cf1b..f73adfd 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java @@ -140,5 +140,9 @@ public class ConfigUtils { return ReflectionUtils.newInstance(theClass, conf); } + + public static boolean useNewApi(Configuration conf) { + return conf.getBoolean("mapred.mapper.new-api", false); + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java index f352e08..3920ce6 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java @@ -29,9 +29,11 @@ import org.apache.tez.common.Constants; import org.apache.tez.common.TezJobConfig; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.engine.api.Partitioner; +import org.apache.tez.engine.common.combine.Combiner; import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput; import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles; import org.apache.tez.engine.newapi.TezOutputContext; +import org.apache.tez.engine.newapi.TezTaskContext; public class TezEngineUtils { @@ -55,6 +57,43 @@ public class TezEngineUtils { } @SuppressWarnings("unchecked") + public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException { + Class clazz; + String className = conf.get(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS); + if (className == null) { + LOG.info("No combiner specified via " + TezJobConfig.TEZ_ENGINE_COMBINER_CLASS + ". Combiner will not be used"); + return null; + } + LOG.info("Using Combiner class: " + className); + try { + clazz = (Class) conf.getClassByName(className); + } catch (ClassNotFoundException e) { + throw new IOException("Unable to load combiner class: " + className); + } + + Combiner combiner = null; + + Constructor ctor; + try { + ctor = clazz.getConstructor(TezTaskContext.class); + combiner = ctor.newInstance(taskContext); + } catch (SecurityException e) { + throw new IOException(e); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } catch (InvocationTargetException e) { + throw new IOException(e); + } + return combiner; + } + + @SuppressWarnings("unchecked") public static Partitioner instantiatePartitioner(Configuration conf) throws IOException { Class clazz; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java index a33d00b..b7867aa 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java @@ -101,6 +101,8 @@ public class ValuesIterator { return key; } + // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable + public Iterable getValues() { return new Iterable() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java new file mode 100644 index 0000000..6f86d61 --- /dev/null +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.engine.common.combine; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.tez.common.TezJobConfig; +import org.apache.tez.common.TezTaskContext; +import org.apache.tez.engine.common.sort.impl.IFile.Writer; +import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; + +/** + *Combiner Initialization

The Combiner class is picked up + * using the TEZ_ENGINE_COMBINER_CLASS attribute in {@link TezJobConfig} + * + * + * Partitioners need to provide a single argument ({@link TezTaskContext}) + * constructor. + */ +@Unstable +@LimitedPrivate("mapreduce") +public interface Combiner { + public void combine(TezRawKeyValueIterator rawIter, Writer writer) + throws InterruptedException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java index 093a293..ad9bb5f 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java @@ -48,6 +48,7 @@ import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.engine.common.ConfigUtils; import org.apache.tez.engine.common.InputAttemptIdentifier; +import org.apache.tez.engine.common.combine.Combiner; import org.apache.tez.engine.common.sort.impl.IFile; import org.apache.tez.engine.common.sort.impl.TezMerger; import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; @@ -55,7 +56,6 @@ import org.apache.tez.engine.common.sort.impl.IFile.Writer; import org.apache.tez.engine.common.sort.impl.TezMerger.Segment; import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles; import org.apache.tez.engine.hadoop.compat.NullProgressable; -import org.apache.tez.engine.newapi.Processor; import org.apache.tez.engine.newapi.TezInputContext; @InterfaceAudience.Private @@ -72,7 +72,7 @@ public class MergeManager { private final TezTaskOutputFiles mapOutputFile; private final Progressable nullProgressable = new NullProgressable(); - private final Processor combineProcessor = null; // TODO NEWTEZ Fix CombineProcessor + private final Combiner combiner; Set inMemoryMergedMapOutputs = new TreeSet(new MapOutput.MapOutputComparator()); @@ -98,12 +98,6 @@ public class MergeManager { private final ExceptionReporter exceptionReporter; private final TezInputContext inputContext; - - /** - * Combiner processor to run during in-memory merge, if defined. - */ - // TODO NEWTEZ Fix Combiner - //private final Processor combineProcessor; private final TezCounter spilledRecordsCounter; @@ -119,18 +113,18 @@ public class MergeManager { FileSystem localFS, LocalDirAllocator localDirAllocator, TezInputContext inputContext, - Processor combineProcessor, + Combiner combiner, TezCounter spilledRecordsCounter, TezCounter reduceCombineInputCounter, TezCounter mergedMapOutputsCounter, ExceptionReporter exceptionReporter) { - // TODO NEWTEZ Change to include Combiner this.inputContext = inputContext; this.conf = conf; this.localDirAllocator = localDirAllocator; this.exceptionReporter = exceptionReporter; - //this.combineProcessor = combineProcessor; + this.combiner = combiner; + this.reduceCombineInputCounter = reduceCombineInputCounter; this.spilledRecordsCounter = spilledRecordsCounter; this.mergedMapOutputsCounter = mergedMapOutputsCounter; @@ -370,27 +364,8 @@ public class MergeManager { } void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer) - throws IOException, InterruptedException { - - // TODO NEWTEZ Fix CombineProcessor - -// CombineInput combineIn = new CombineInput(kvIter); -// combineIn.initialize(conf, reporter); -// -// CombineOutput combineOut = new CombineOutput(writer); -// combineOut.initialize(conf, reporter); -// -// try { -// combineProcessor.process(new Input[] {combineIn}, -// new Output[] {combineOut}); -// } catch (IOException ioe) { -// try { -// combineProcessor.close(); -// } catch (IOException ignoredException) {} -// -// throw ioe; -// } - + throws IOException, InterruptedException { + combiner.combine(kvIter, writer); } private class IntermediateMemoryToMemoryMerger @@ -500,7 +475,7 @@ public class MergeManager { (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), nullProgressable, spilledRecordsCounter, null, null); - if (null == combineProcessor) { + if (null == combiner) { TezMerger.writeFile(rIter, writer, nullProgressable, conf); } else { runCombineProcessor(rIter, writer); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java index 620c620..f605b7c 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java @@ -38,6 +38,8 @@ import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.engine.common.TezEngineUtils; +import org.apache.tez.engine.common.combine.Combiner; import org.apache.tez.engine.common.shuffle.server.ShuffleHandler; import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.engine.newapi.Event; @@ -67,11 +69,10 @@ public class Shuffle implements ExceptionReporter { private final SecretKey jobTokenSecret; private AtomicInteger reduceRange = new AtomicInteger( TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT); - + private FutureTask runShuffleFuture; public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException { - // TODO NEWTEZ Handle Combiner this.inputContext = inputContext; this.conf = conf; this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(), @@ -84,6 +85,8 @@ public class Shuffle implements ExceptionReporter { .getJobTokenSecretFromTokenBytes(inputContext .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)); + Combiner combiner = TezEngineUtils.instantiateCombiner(conf, inputContext); + FileSystem localFS = FileSystem.getLocal(this.conf); LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); @@ -123,7 +126,7 @@ public class Shuffle implements ExceptionReporter { localFS, localDirAllocator, inputContext, - null, // TODO NEWTEZ Fix Combiner + combiner, spilledRecordsCounter, reduceCombineInputCounter, mergedMapOutputsCounter, @@ -272,5 +275,4 @@ public class Shuffle implements ExceptionReporter { throw e; } } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java index 8b20192..1b5e015 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java @@ -43,15 +43,14 @@ import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.engine.api.Partitioner; -import org.apache.tez.engine.api.Processor; import org.apache.tez.engine.common.ConfigUtils; import org.apache.tez.engine.common.TezEngineUtils; +import org.apache.tez.engine.common.combine.Combiner; import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader; import org.apache.tez.engine.common.sort.impl.IFile.Writer; import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput; import org.apache.tez.engine.hadoop.compat.NullProgressable; import org.apache.tez.engine.newapi.TezOutputContext; -import org.apache.tez.engine.records.OutputContext; @SuppressWarnings({"unchecked", "rawtypes"}) public abstract class ExternalSorter { @@ -66,7 +65,7 @@ public abstract class ExternalSorter { protected Progressable nullProgressable = new NullProgressable(); protected TezOutputContext outputContext; - protected Processor combineProcessor; + protected Combiner combiner; protected Partitioner partitioner; protected Configuration conf; protected FileSystem rfs; @@ -84,9 +83,6 @@ public abstract class ExternalSorter { // Compression for map-outputs protected CompressionCodec codec; - // TODO NEWTEZ Setup CombineProcessor - // TODO NEWTEZ Setup Partitioner in SimpleOutput - // Counters // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer] protected TezCounter mapOutputByteCounter; @@ -139,12 +135,7 @@ public abstract class ExternalSorter { LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]"); this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions); this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf); - } - - // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set. - // Alternately add a config key with a classname, which is easy to initialize. - public void setCombiner(Processor combineProcessor) { - this.combineProcessor = combineProcessor; + this.combiner = TezEngineUtils.instantiateCombiner(this.conf, outputContext); } /** @@ -165,27 +156,11 @@ public abstract class ExternalSorter { protected void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer) throws IOException { - - // TODO NEWTEZ Fix Combiner. -// CombineInput combineIn = new CombineInput(kvIter); -// combineIn.initialize(job, runningTaskContext.getTaskReporter()); -// -// CombineOutput combineOut = new CombineOutput(writer); -// combineOut.initialize(job, runningTaskContext.getTaskReporter()); -// -// try { -// combineProcessor.process(new Input[] {combineIn}, -// new Output[] {combineOut}); -// } catch (IOException ioe) { -// try { -// combineProcessor.close(); -// } catch (IOException ignored) {} -// -// // Do not close output here as the sorter should close the combine output -// -// throw ioe; -// } - + try { + combiner.combine(kvIter, writer); + } catch (InterruptedException e) { + throw new IOException(e); + } } /** @@ -216,8 +191,4 @@ public abstract class ExternalSorter { public ShuffleHeader getShuffleHeader(int reduce) { throw new UnsupportedOperationException("getShuffleHeader isn't supported!"); } - - public OutputContext getOutputContext() { - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java index bafbd4d..952568e 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java @@ -45,10 +45,9 @@ import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; import org.apache.tez.common.TezJobConfig; import org.apache.tez.engine.common.ConfigUtils; -import org.apache.tez.engine.newapi.TezOutputContext; -import org.apache.tez.engine.records.OutputContext; import org.apache.tez.engine.common.sort.impl.IFile.Writer; import org.apache.tez.engine.common.sort.impl.TezMerger.Segment; +import org.apache.tez.engine.newapi.TezOutputContext; @SuppressWarnings({"unchecked", "rawtypes"}) public class PipelinedSorter extends ExternalSorter { @@ -270,7 +269,7 @@ public class PipelinedSorter extends ExternalSorter { new Writer(conf, out, keyClass, valClass, codec, spilledRecordsCounter); writer.setRLE(merger.needsRLE()); - if (combineProcessor == null) { + if (combiner == null) { while(kvIter.next()) { writer.append(kvIter.getKey(), kvIter.getValue()); } @@ -380,10 +379,10 @@ public class PipelinedSorter extends ExternalSorter { new Writer(conf, finalOut, keyClass, valClass, codec, spilledRecordsCounter); writer.setRLE(merger.needsRLE()); - if (combineProcessor == null || numSpills < minSpillsForCombine) { + if (combiner == null || numSpills < minSpillsForCombine) { TezMerger.writeFile(kvIter, writer, nullProgressable, conf); } else { - runCombineProcessor(kvIter, writer); + runCombineProcessor(kvIter, writer); } //close @@ -930,10 +929,4 @@ public class PipelinedSorter extends ExternalSorter { } } - - @Override - public OutputContext getOutputContext() { - return null; - } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java index b1e17e7..1ad31f7 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java @@ -752,7 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { long segmentStart = out.getPos(); writer = new Writer(conf, out, keyClass, valClass, codec, spilledRecordsCounter); - if (combineProcessor == null) { + if (combiner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && @@ -1082,7 +1082,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { Writer writer = new Writer(conf, finalOut, keyClass, valClass, codec, spilledRecordsCounter); - if (combineProcessor == null || numSpills < minSpillsForCombine) { + if (combiner == null || numSpills < minSpillsForCombine) { TezMerger.writeFile(kvIter, writer, nullProgressable, conf); } else { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java index 126c5b2..841e54d 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java @@ -30,7 +30,14 @@ import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.util.Progressable; import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; -/** Iterates values while keys match in sorted input. */ + +/** + * Iterates values while keys match in sorted input. + * + * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists, + * followed by getKey() and getValues() to get the current key and list of values. + * + */ public class ValuesIterator implements Iterator { protected TezRawKeyValueIterator in; //input iterator private KEY key; // current key http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java index 0732e20..91bb6d5 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java @@ -165,7 +165,6 @@ public class ShuffledMergedInput implements LogicalInput { } - // This functionality is currently broken. If there's inputs which need to be // written to disk, there's a possibility that inputs from the different // sources could clobber each others' output. Also the current structures do http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java index 2f4a62a..7a9b7e0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java @@ -59,6 +59,7 @@ import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.mapreduce.newcombine.MRCombiner; import org.apache.tez.mapreduce.newpartition.MRPartitioner; @@ -367,6 +368,18 @@ public class MRHelpers { // TODO eventually ACLs conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName()); + + boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false); + if (useNewApi) { + if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) { + conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName()); + } + } else { + if (conf.get("mapred.combiner.class") != null) { + conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName()); + } + } + setWorkingDirectory(conf); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java index 6b68e95..ad231b3 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java @@ -18,7 +18,6 @@ package org.apache.tez.mapreduce.hadoop; -import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.tez.common.TezJobConfig; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys; +import org.apache.tez.mapreduce.newcombine.MRCombiner; import org.apache.tez.mapreduce.newpartition.MRPartitioner; import com.google.common.base.Preconditions; @@ -220,22 +220,39 @@ public class MultiStageMRConfToTezTranslator { // Assuming no 0 map jobs, and the first stage is always a map. int numStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1); + // Setup Tez partitioner class conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName()); + + // Setup Tez Combiner class if required. + // This would already have been set since the call is via JobClient + boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false); + if (useNewApi) { + if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) { + conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName()); + } + } else { + if (conf.get("mapred.combiner.class") != null) { + conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName()); + } + } Configuration confs[] = new Configuration[numStages]; Configuration nonItermediateConf = MultiStageMRConfigUtil.extractStageConf( conf, ""); + confs[0].setBoolean(MRConfig.IS_MAP_PROCESSOR, true); if (numStages == 1) { confs[0] = nonItermediateConf; } else { confs[0] = nonItermediateConf; confs[numStages - 1] = new Configuration(nonItermediateConf); + confs[numStages -1].setBoolean(MRConfig.IS_MAP_PROCESSOR, false); } if (numStages > 2) { for (int i = 1; i < numStages - 1; i++) { confs[i] = MultiStageMRConfigUtil.extractStageConf(conf, MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i, "")); + confs[i].setBoolean(MRConfig.IS_MAP_PROCESSOR, false); } } return confs; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java new file mode 100644 index 0000000..788019a --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.newcombine; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.ReduceContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; +import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.engine.common.ConfigUtils; +import org.apache.tez.engine.common.ValuesIterator; +import org.apache.tez.engine.common.combine.Combiner; +import org.apache.tez.engine.common.sort.impl.IFile.Writer; +import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; +import org.apache.tez.engine.newapi.TezInputContext; +import org.apache.tez.engine.newapi.TezOutputContext; +import org.apache.tez.engine.newapi.impl.TezTaskContextImpl; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; +import org.apache.tez.mapreduce.newprocessor.MRTaskReporter; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class MRCombiner implements Combiner { + + private static Log LOG = LogFactory.getLog(MRCombiner.class); + + private final Configuration conf; + private final Class keyClass; + private final Class valClass; + private final RawComparator comparator; + private final boolean useNewApi; + + private final TezCounter combineInputKeyCounter; + private final TezCounter combineInputValueCounter; + + private final MRTaskReporter reporter; + private final TaskAttemptID mrTaskAttemptID; + + public MRCombiner(TezTaskContextImpl taskContext) throws IOException { + this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload()); + + assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext); + if (taskContext instanceof TezOutputContext) { + this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf); + this.valClass = ConfigUtils.getIntermediateOutputKeyClass(conf); + this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf); + this.reporter = new MRTaskReporter((TezOutputContext)taskContext); + } else { + this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf); + this.valClass = ConfigUtils.getIntermediateInputValueClass(conf); + this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf); + this.reporter = new MRTaskReporter((TezInputContext)taskContext); + } + + this.useNewApi = ConfigUtils.useNewApi(conf); + + combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); + combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); + + boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false); + this.mrTaskAttemptID = new TaskAttemptID( + new TaskID(String.valueOf(taskContext.getApplicationId() + .getClusterTimestamp()), taskContext.getApplicationId().getId(), + isMap ? TaskType.MAP : TaskType.REDUCE, + taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber()); + + LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi); + } + + @Override + public void combine(TezRawKeyValueIterator rawIter, Writer writer) + throws InterruptedException, IOException { + if (useNewApi) { + runNewCombiner(rawIter, writer); + } else { + runOldCombiner(rawIter, writer); + } + + } + + ///////////////// Methods for old API ////////////////////// + + private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException { + Class reducerClazz = (Class) conf.getClass("mapred.combiner.class", null, Reducer.class); + + Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf); + + OutputCollector collector = new OutputCollector() { + @Override + public void collect(Object key, Object value) throws IOException { + writer.append(key, value); + } + }; + + CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator); + + while (values.moveToNext()) { + combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter); + } + } + + private final class CombinerValuesIterator extends ValuesIterator { + public CombinerValuesIterator(TezRawKeyValueIterator rawIter, + Class keyClass, Class valClass, + RawComparator comparator) throws IOException { + super(rawIter, comparator, keyClass, valClass, conf, + combineInputKeyCounter, combineInputValueCounter); + } + } + + ///////////////// End of methods for old API ////////////////////// + + ///////////////// Methods for new API ////////////////////// + + private void runNewCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws InterruptedException, IOException { + + RecordWriter recordWriter = new RecordWriter() { + + @Override + public void write(Object key, Object value) throws IOException, + InterruptedException { + writer.append(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + // Will be closed by whoever invokes the combiner. + } + }; + + Class reducerClazz = (Class) conf + .getClass(MRJobConfig.COMBINE_CLASS_ATTR, null, + org.apache.hadoop.mapreduce.Reducer.class); + org.apache.hadoop.mapreduce.Reducer reducer = ReflectionUtils.newInstance(reducerClazz, conf); + + org.apache.hadoop.mapreduce.Reducer.Context reducerContext = + createReduceContext( + conf, + mrTaskAttemptID, + rawIter, + new MRCounters.MRCounter(combineInputKeyCounter), + new MRCounters.MRCounter(combineInputValueCounter), + recordWriter, + reporter, + (RawComparator)comparator, + keyClass, + valClass); + + reducer.run(reducerContext); + recordWriter.close(reducerContext); + } + + private static org.apache.hadoop.mapreduce.Reducer.Context createReduceContext( + Configuration conf, + TaskAttemptID mrTaskAttemptID, + final TezRawKeyValueIterator rawIter, + Counter combineInputKeyCounter, + Counter combineInputValueCounter, + RecordWriter recordWriter, + MRTaskReporter reporter, + RawComparator comparator, + Class keyClass, + Class valClass) throws InterruptedException, IOException { + + RawKeyValueIterator r = new RawKeyValueIterator() { + + @Override + public boolean next() throws IOException { + return rawIter.next(); + } + + @Override + public DataInputBuffer getValue() throws IOException { + return rawIter.getValue(); + } + + @Override + public Progress getProgress() { + return rawIter.getProgress(); + } + + @Override + public DataInputBuffer getKey() throws IOException { + return rawIter.getKey(); + } + + @Override + public void close() throws IOException { + rawIter.close(); + } + }; + + ReduceContext rContext = new ReduceContextImpl( + conf, mrTaskAttemptID, r, combineInputKeyCounter, + combineInputValueCounter, recordWriter, null, reporter, comparator, + keyClass, valClass); + + org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new WrappedReducer() + .getReducerContext(rContext); + return reducerContext; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java index 4a967ad..dcea35c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezJobConfig; +import org.apache.tez.engine.common.ConfigUtils; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -38,7 +39,7 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner { private org.apache.hadoop.mapred.Partitioner oldPartitioner; public MRPartitioner(Configuration conf) { - this.useNewApi = conf.getBoolean("mapred.mapper.new-api", false); + this.useNewApi = ConfigUtils.useNewApi(conf); this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1); if (useNewApi) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java index d01e562..c7c9567 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.Reporter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.engine.newapi.TezInputContext; import org.apache.tez.engine.newapi.TezOutputContext; import org.apache.tez.engine.newapi.TezProcessorContext; import org.apache.tez.engine.newapi.TezTaskContext; @@ -53,6 +54,12 @@ public class MRTaskReporter this.reporter = new MRReporter(context); this.isProcessorContext = false; } + + public MRTaskReporter(TezInputContext context) { + this.context= context; + this.reporter = new MRReporter(context); + this.isProcessorContext = false; + } public void setProgress(float progress) { if (isProcessorContext) {