Return-Path:
X-Original-To: apmail-tez-commits-archive@minotaur.apache.org
Delivered-To: apmail-tez-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id 015EA10A33
for ;
Fri, 20 Sep 2013 22:47:06 +0000 (UTC)
Received: (qmail 22765 invoked by uid 500); 20 Sep 2013 22:45:50 -0000
Delivered-To: apmail-tez-commits-archive@tez.apache.org
Received: (qmail 22488 invoked by uid 500); 20 Sep 2013 22:44:50 -0000
Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@tez.incubator.apache.org
Delivered-To: mailing list commits@tez.incubator.apache.org
Received: (qmail 22480 invoked by uid 99); 20 Sep 2013 22:44:49 -0000
Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 22:44:49 +0000
X-ASF-Spam-Status: No, hits=-2000.6 required=5.0
tests=ALL_TRUSTED,RP_MATCHES_RCVD
X-Spam-Check-By: apache.org
Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3)
by apache.org (qpsmtpd/0.29) with SMTP; Fri, 20 Sep 2013 22:44:46 +0000
Received: (qmail 22395 invoked by uid 99); 20 Sep 2013 22:44:22 -0000
Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org)
(140.211.11.114)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 22:44:22 +0000
Received: by tyr.zones.apache.org (Postfix, from userid 65534)
id B641C8AF8D0; Fri, 20 Sep 2013 22:44:21 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: sseth@apache.org
To: commits@tez.incubator.apache.org
Message-Id: <5c2f6f509b3c46d0a0b9dcc633347606@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: git commit: TEZ-433. Change Combiner to work with new APIs (part of
TEZ-398). (sseth)
Date: Fri, 20 Sep 2013 22:44:21 +0000 (UTC)
X-Virus-Checked: Checked by ClamAV on apache.org
Updated Branches:
refs/heads/TEZ-398 85a9d46e3 -> eb0f6ffe5
TEZ-433. Change Combiner to work with new APIs (part of TEZ-398).
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/eb0f6ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/eb0f6ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/eb0f6ffe
Branch: refs/heads/TEZ-398
Commit: eb0f6ffe5250b497f35909f8559682cbaa3621ae
Parents: 85a9d46
Author: Siddharth Seth
Authored: Fri Sep 20 15:43:56 2013 -0700
Committer: Siddharth Seth
Committed: Fri Sep 20 15:43:56 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 5 +
.../apache/tez/engine/common/ConfigUtils.java | 4 +
.../tez/engine/common/TezEngineUtils.java | 39 +++
.../tez/engine/common/ValuesIterator.java | 2 +
.../tez/engine/common/combine/Combiner.java | 43 ++++
.../common/shuffle/impl/MergeManager.java | 41 +---
.../tez/engine/common/shuffle/impl/Shuffle.java | 10 +-
.../engine/common/sort/impl/ExternalSorter.java | 45 +---
.../common/sort/impl/PipelinedSorter.java | 15 +-
.../common/sort/impl/dflt/DefaultSorter.java | 4 +-
.../engine/common/task/impl/ValuesIterator.java | 9 +-
.../engine/lib/input/ShuffledMergedInput.java | 1 -
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 13 +
.../hadoop/MultiStageMRConfToTezTranslator.java | 19 +-
.../tez/mapreduce/newcombine/MRCombiner.java | 242 +++++++++++++++++++
.../mapreduce/newpartition/MRPartitioner.java | 3 +-
.../mapreduce/newprocessor/MRTaskReporter.java | 7 +
17 files changed, 411 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 12c2b4b..7c4540c 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -128,6 +128,11 @@ public class TezJobConfig {
*/
public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
+ /**
+ * Specifies a combiner class (primarily for Shuffle)
+ */
+ public static final String TEZ_ENGINE_COMBINER_CLASS = "tez.engine.combiner.class";
+
public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
index a92cf1b..f73adfd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
@@ -140,5 +140,9 @@ public class ConfigUtils {
return ReflectionUtils.newInstance(theClass, conf);
}
+
+ public static boolean useNewApi(Configuration conf) {
+ return conf.getBoolean("mapred.mapper.new-api", false);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index f352e08..3920ce6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -29,9 +29,11 @@ import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.common.combine.Combiner;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.TezTaskContext;
public class TezEngineUtils {
@@ -55,6 +57,43 @@ public class TezEngineUtils {
}
@SuppressWarnings("unchecked")
+ public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException {
+ Class extends Combiner> clazz;
+ String className = conf.get(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS);
+ if (className == null) {
+ LOG.info("No combiner specified via " + TezJobConfig.TEZ_ENGINE_COMBINER_CLASS + ". Combiner will not be used");
+ return null;
+ }
+ LOG.info("Using Combiner class: " + className);
+ try {
+ clazz = (Class extends Combiner>) conf.getClassByName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to load combiner class: " + className);
+ }
+
+ Combiner combiner = null;
+
+ Constructor extends Combiner> ctor;
+ try {
+ ctor = clazz.getConstructor(TezTaskContext.class);
+ combiner = ctor.newInstance(taskContext);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ }
+ return combiner;
+ }
+
+ @SuppressWarnings("unchecked")
public static Partitioner instantiatePartitioner(Configuration conf)
throws IOException {
Class extends Partitioner> clazz;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
index a33d00b..b7867aa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
@@ -101,6 +101,8 @@ public class ValuesIterator {
return key;
}
+ // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable
+
public Iterable getValues() {
return new Iterable() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
new file mode 100644
index 0000000..6f86d61
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.combine;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ *Combiner Initialization
The Combiner class is picked up
+ * using the TEZ_ENGINE_COMBINER_CLASS attribute in {@link TezJobConfig}
+ *
+ *
+ * Partitioners need to provide a single argument ({@link TezTaskContext})
+ * constructor.
+ */
+@Unstable
+@LimitedPrivate("mapreduce")
+public interface Combiner {
+ public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+ throws InterruptedException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 093a293..ad9bb5f 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -48,6 +48,7 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.combine.Combiner;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
@@ -55,7 +56,6 @@ import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
import org.apache.tez.engine.hadoop.compat.NullProgressable;
-import org.apache.tez.engine.newapi.Processor;
import org.apache.tez.engine.newapi.TezInputContext;
@InterfaceAudience.Private
@@ -72,7 +72,7 @@ public class MergeManager {
private final TezTaskOutputFiles mapOutputFile;
private final Progressable nullProgressable = new NullProgressable();
- private final Processor combineProcessor = null; // TODO NEWTEZ Fix CombineProcessor
+ private final Combiner combiner;
Set inMemoryMergedMapOutputs =
new TreeSet(new MapOutput.MapOutputComparator());
@@ -98,12 +98,6 @@ public class MergeManager {
private final ExceptionReporter exceptionReporter;
private final TezInputContext inputContext;
-
- /**
- * Combiner processor to run during in-memory merge, if defined.
- */
- // TODO NEWTEZ Fix Combiner
- //private final Processor combineProcessor;
private final TezCounter spilledRecordsCounter;
@@ -119,18 +113,18 @@ public class MergeManager {
FileSystem localFS,
LocalDirAllocator localDirAllocator,
TezInputContext inputContext,
- Processor combineProcessor,
+ Combiner combiner,
TezCounter spilledRecordsCounter,
TezCounter reduceCombineInputCounter,
TezCounter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter) {
- // TODO NEWTEZ Change to include Combiner
this.inputContext = inputContext;
this.conf = conf;
this.localDirAllocator = localDirAllocator;
this.exceptionReporter = exceptionReporter;
- //this.combineProcessor = combineProcessor;
+ this.combiner = combiner;
+
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
@@ -370,27 +364,8 @@ public class MergeManager {
}
void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
- throws IOException, InterruptedException {
-
- // TODO NEWTEZ Fix CombineProcessor
-
-// CombineInput combineIn = new CombineInput(kvIter);
-// combineIn.initialize(conf, reporter);
-//
-// CombineOutput combineOut = new CombineOutput(writer);
-// combineOut.initialize(conf, reporter);
-//
-// try {
-// combineProcessor.process(new Input[] {combineIn},
-// new Output[] {combineOut});
-// } catch (IOException ioe) {
-// try {
-// combineProcessor.close();
-// } catch (IOException ignoredException) {}
-//
-// throw ioe;
-// }
-
+ throws IOException, InterruptedException {
+ combiner.combine(kvIter, writer);
}
private class IntermediateMemoryToMemoryMerger
@@ -500,7 +475,7 @@ public class MergeManager {
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
nullProgressable, spilledRecordsCounter, null, null);
- if (null == combineProcessor) {
+ if (null == combiner) {
TezMerger.writeFile(rIter, writer, nullProgressable, conf);
} else {
runCombineProcessor(rIter, writer);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 620c620..f605b7c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -38,6 +38,8 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.combine.Combiner;
import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.newapi.Event;
@@ -67,11 +69,10 @@ public class Shuffle implements ExceptionReporter {
private final SecretKey jobTokenSecret;
private AtomicInteger reduceRange = new AtomicInteger(
TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
-
+
private FutureTask runShuffleFuture;
public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
- // TODO NEWTEZ Handle Combiner
this.inputContext = inputContext;
this.conf = conf;
this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
@@ -84,6 +85,8 @@ public class Shuffle implements ExceptionReporter {
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+ Combiner combiner = TezEngineUtils.instantiateCombiner(conf, inputContext);
+
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
@@ -123,7 +126,7 @@ public class Shuffle implements ExceptionReporter {
localFS,
localDirAllocator,
inputContext,
- null, // TODO NEWTEZ Fix Combiner
+ combiner,
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
@@ -272,5 +275,4 @@ public class Shuffle implements ExceptionReporter {
throw e;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 8b20192..1b5e015 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -43,15 +43,14 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.combine.Combiner;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
import org.apache.tez.engine.hadoop.compat.NullProgressable;
import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.records.OutputContext;
@SuppressWarnings({"unchecked", "rawtypes"})
public abstract class ExternalSorter {
@@ -66,7 +65,7 @@ public abstract class ExternalSorter {
protected Progressable nullProgressable = new NullProgressable();
protected TezOutputContext outputContext;
- protected Processor combineProcessor;
+ protected Combiner combiner;
protected Partitioner partitioner;
protected Configuration conf;
protected FileSystem rfs;
@@ -84,9 +83,6 @@ public abstract class ExternalSorter {
// Compression for map-outputs
protected CompressionCodec codec;
- // TODO NEWTEZ Setup CombineProcessor
- // TODO NEWTEZ Setup Partitioner in SimpleOutput
-
// Counters
// TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
protected TezCounter mapOutputByteCounter;
@@ -139,12 +135,7 @@ public abstract class ExternalSorter {
LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]");
this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions);
this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf);
- }
-
- // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
- // Alternately add a config key with a classname, which is easy to initialize.
- public void setCombiner(Processor combineProcessor) {
- this.combineProcessor = combineProcessor;
+ this.combiner = TezEngineUtils.instantiateCombiner(this.conf, outputContext);
}
/**
@@ -165,27 +156,11 @@ public abstract class ExternalSorter {
protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
Writer writer) throws IOException {
-
- // TODO NEWTEZ Fix Combiner.
-// CombineInput combineIn = new CombineInput(kvIter);
-// combineIn.initialize(job, runningTaskContext.getTaskReporter());
-//
-// CombineOutput combineOut = new CombineOutput(writer);
-// combineOut.initialize(job, runningTaskContext.getTaskReporter());
-//
-// try {
-// combineProcessor.process(new Input[] {combineIn},
-// new Output[] {combineOut});
-// } catch (IOException ioe) {
-// try {
-// combineProcessor.close();
-// } catch (IOException ignored) {}
-//
-// // Do not close output here as the sorter should close the combine output
-//
-// throw ioe;
-// }
-
+ try {
+ combiner.combine(kvIter, writer);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
}
/**
@@ -216,8 +191,4 @@ public abstract class ExternalSorter {
public ShuffleHeader getShuffleHeader(int reduce) {
throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
}
-
- public OutputContext getOutputContext() {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
index bafbd4d..952568e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
@@ -45,10 +45,9 @@ import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.engine.newapi.TezOutputContext;
@SuppressWarnings({"unchecked", "rawtypes"})
public class PipelinedSorter extends ExternalSorter {
@@ -270,7 +269,7 @@ public class PipelinedSorter extends ExternalSorter {
new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter);
writer.setRLE(merger.needsRLE());
- if (combineProcessor == null) {
+ if (combiner == null) {
while(kvIter.next()) {
writer.append(kvIter.getKey(), kvIter.getValue());
}
@@ -380,10 +379,10 @@ public class PipelinedSorter extends ExternalSorter {
new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
writer.setRLE(merger.needsRLE());
- if (combineProcessor == null || numSpills < minSpillsForCombine) {
+ if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
} else {
- runCombineProcessor(kvIter, writer);
+ runCombineProcessor(kvIter, writer);
}
//close
@@ -930,10 +929,4 @@ public class PipelinedSorter extends ExternalSorter {
}
}
-
- @Override
- public OutputContext getOutputContext() {
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index b1e17e7..1ad31f7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -752,7 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
long segmentStart = out.getPos();
writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter);
- if (combineProcessor == null) {
+ if (combiner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
@@ -1082,7 +1082,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
Writer writer =
new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
- if (combineProcessor == null || numSpills < minSpillsForCombine) {
+ if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
nullProgressable, conf);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
index 126c5b2..841e54d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
@@ -30,7 +30,14 @@ import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.util.Progressable;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-/** Iterates values while keys match in sorted input. */
+
+/**
+ * Iterates values while keys match in sorted input.
+ *
+ * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists,
+ * followed by getKey() and getValues() to get the current key and list of values.
+ *
+ */
public class ValuesIterator implements Iterator {
protected TezRawKeyValueIterator in; //input iterator
private KEY key; // current key
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index 0732e20..91bb6d5 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -165,7 +165,6 @@ public class ShuffledMergedInput implements LogicalInput {
}
-
// This functionality is currently broken. If there's inputs which need to be
// written to disk, there's a possibility that inputs from the different
// sources could clobber each others' output. Also the current structures do
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 2f4a62a..7a9b7e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -59,6 +59,7 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.newcombine.MRCombiner;
import org.apache.tez.mapreduce.newpartition.MRPartitioner;
@@ -367,6 +368,18 @@ public class MRHelpers {
// TODO eventually ACLs
conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+
+ boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+ if (useNewApi) {
+ if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
+ conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ }
+ } else {
+ if (conf.get("mapred.combiner.class") != null) {
+ conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ }
+ }
+
setWorkingDirectory(conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 6b68e95..ad231b3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -18,7 +18,6 @@
package org.apache.tez.mapreduce.hadoop;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
+import org.apache.tez.mapreduce.newcombine.MRCombiner;
import org.apache.tez.mapreduce.newpartition.MRPartitioner;
import com.google.common.base.Preconditions;
@@ -220,22 +220,39 @@ public class MultiStageMRConfToTezTranslator {
// Assuming no 0 map jobs, and the first stage is always a map.
int numStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
+ // Setup Tez partitioner class
conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS,
MRPartitioner.class.getName());
+
+ // Setup Tez Combiner class if required.
+ // This would already have been set since the call is via JobClient
+ boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+ if (useNewApi) {
+ if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
+ conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ }
+ } else {
+ if (conf.get("mapred.combiner.class") != null) {
+ conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ }
+ }
Configuration confs[] = new Configuration[numStages];
Configuration nonItermediateConf = MultiStageMRConfigUtil.extractStageConf(
conf, "");
+ confs[0].setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
if (numStages == 1) {
confs[0] = nonItermediateConf;
} else {
confs[0] = nonItermediateConf;
confs[numStages - 1] = new Configuration(nonItermediateConf);
+ confs[numStages -1].setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
}
if (numStages > 2) {
for (int i = 1; i < numStages - 1; i++) {
confs[i] = MultiStageMRConfigUtil.extractStageConf(conf,
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i, ""));
+ confs[i].setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
}
}
return confs;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
new file mode 100644
index 0000000..788019a
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.newcombine;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.ValuesIterator;
+import org.apache.tez.engine.common.combine.Combiner;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.impl.TezTaskContextImpl;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MRCombiner implements Combiner {
+
+ private static Log LOG = LogFactory.getLog(MRCombiner.class);
+
+ private final Configuration conf;
+ private final Class> keyClass;
+ private final Class> valClass;
+ private final RawComparator> comparator;
+ private final boolean useNewApi;
+
+ private final TezCounter combineInputKeyCounter;
+ private final TezCounter combineInputValueCounter;
+
+ private final MRTaskReporter reporter;
+ private final TaskAttemptID mrTaskAttemptID;
+
+ public MRCombiner(TezTaskContextImpl taskContext) throws IOException {
+ this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+
+ assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext);
+ if (taskContext instanceof TezOutputContext) {
+ this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
+ this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
+ this.reporter = new MRTaskReporter((TezOutputContext)taskContext);
+ } else {
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+ this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+ this.reporter = new MRTaskReporter((TezInputContext)taskContext);
+ }
+
+ this.useNewApi = ConfigUtils.useNewApi(conf);
+
+ combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
+ this.mrTaskAttemptID = new TaskAttemptID(
+ new TaskID(String.valueOf(taskContext.getApplicationId()
+ .getClusterTimestamp()), taskContext.getApplicationId().getId(),
+ isMap ? TaskType.MAP : TaskType.REDUCE,
+ taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
+
+ LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
+ }
+
+ @Override
+ public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+ throws InterruptedException, IOException {
+ if (useNewApi) {
+ runNewCombiner(rawIter, writer);
+ } else {
+ runOldCombiner(rawIter, writer);
+ }
+
+ }
+
+ ///////////////// Methods for old API //////////////////////
+
+ private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
+ Class extends Reducer> reducerClazz = (Class extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);
+
+ Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);
+
+ OutputCollector collector = new OutputCollector() {
+ @Override
+ public void collect(Object key, Object value) throws IOException {
+ writer.append(key, value);
+ }
+ };
+
+ CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);
+
+ while (values.moveToNext()) {
+ combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
+ }
+ }
+
+ private final class CombinerValuesIterator extends ValuesIterator {
+ public CombinerValuesIterator(TezRawKeyValueIterator rawIter,
+ Class keyClass, Class valClass,
+ RawComparator comparator) throws IOException {
+ super(rawIter, comparator, keyClass, valClass, conf,
+ combineInputKeyCounter, combineInputValueCounter);
+ }
+ }
+
+ ///////////////// End of methods for old API //////////////////////
+
+ ///////////////// Methods for new API //////////////////////
+
+ private void runNewCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws InterruptedException, IOException {
+
+ RecordWriter recordWriter = new RecordWriter() {
+
+ @Override
+ public void write(Object key, Object value) throws IOException,
+ InterruptedException {
+ writer.append(key, value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ // Will be closed by whoever invokes the combiner.
+ }
+ };
+
+ Class extends org.apache.hadoop.mapreduce.Reducer> reducerClazz = (Class extends org.apache.hadoop.mapreduce.Reducer>) conf
+ .getClass(MRJobConfig.COMBINE_CLASS_ATTR, null,
+ org.apache.hadoop.mapreduce.Reducer.class);
+ org.apache.hadoop.mapreduce.Reducer reducer = ReflectionUtils.newInstance(reducerClazz, conf);
+
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+ createReduceContext(
+ conf,
+ mrTaskAttemptID,
+ rawIter,
+ new MRCounters.MRCounter(combineInputKeyCounter),
+ new MRCounters.MRCounter(combineInputValueCounter),
+ recordWriter,
+ reporter,
+ (RawComparator)comparator,
+ keyClass,
+ valClass);
+
+ reducer.run(reducerContext);
+ recordWriter.close(reducerContext);
+ }
+
+ private static org.apache.hadoop.mapreduce.Reducer.Context createReduceContext(
+ Configuration conf,
+ TaskAttemptID mrTaskAttemptID,
+ final TezRawKeyValueIterator rawIter,
+ Counter combineInputKeyCounter,
+ Counter combineInputValueCounter,
+ RecordWriter recordWriter,
+ MRTaskReporter reporter,
+ RawComparator comparator,
+ Class keyClass,
+ Class valClass) throws InterruptedException, IOException {
+
+ RawKeyValueIterator r = new RawKeyValueIterator() {
+
+ @Override
+ public boolean next() throws IOException {
+ return rawIter.next();
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ };
+
+ ReduceContext rContext = new ReduceContextImpl(
+ conf, mrTaskAttemptID, r, combineInputKeyCounter,
+ combineInputValueCounter, recordWriter, null, reporter, comparator,
+ keyClass, valClass);
+
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new WrappedReducer()
+ .getReducerContext(rContext);
+ return reducerContext;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
index 4a967ad..dcea35c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -38,7 +39,7 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
private org.apache.hadoop.mapred.Partitioner oldPartitioner;
public MRPartitioner(Configuration conf) {
- this.useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+ this.useNewApi = ConfigUtils.useNewApi(conf);
this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
if (useNewApi) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
index d01e562..c7c9567 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.newapi.TezInputContext;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.TezProcessorContext;
import org.apache.tez.engine.newapi.TezTaskContext;
@@ -53,6 +54,12 @@ public class MRTaskReporter
this.reporter = new MRReporter(context);
this.isProcessorContext = false;
}
+
+ public MRTaskReporter(TezInputContext context) {
+ this.context= context;
+ this.reporter = new MRReporter(context);
+ this.isProcessorContext = false;
+ }
public void setProgress(float progress) {
if (isProcessorContext) {