tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [05/20] git commit: TEZ-444. Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module (part of TEZ-398). (sseth)
Date Mon, 23 Sep 2013 17:45:19 GMT
TEZ-444.  Rename *.new* packages back to what they should be, remove
dead code from the old packages - mapreduce module (part of TEZ-398).
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3d609458
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3d609458
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3d609458

Branch: refs/heads/TEZ-398
Commit: 3d6094588cc43f8140b2360bd9032b2ed7249e1a
Parents: 5eb0c86
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Sep 23 10:43:39 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Sep 23 10:43:39 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |   4 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |   6 +-
 .../mapreduce/examples/OrderedWordCount.java    |   6 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   6 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |   8 +-
 .../tez/mapreduce/combine/MRCombiner.java       | 440 +++++------
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |   4 +-
 .../hadoop/MultiStageMRConfToTezTranslator.java |   4 +-
 .../tez/mapreduce/hadoop/mapred/MRReporter.java |  76 ++
 .../hadoop/mapred/TaskAttemptContextImpl.java   |  21 +-
 .../hadoop/mapreduce/MapContextImpl.java        |   6 +-
 .../hadoop/mapreduce/ReduceContextImpl.java     | 359 ---------
 .../mapreduce/TaskAttemptContextImpl.java       | 102 +--
 .../mapreduce/TaskInputOutputContextImpl.java   |   6 +-
 .../mapreduce/hadoop/newmapred/MRReporter.java  |  76 --
 .../newmapred/TaskAttemptContextImpl.java       |  93 ---
 .../hadoop/newmapreduce/MapContextImpl.java     |  84 ---
 .../newmapreduce/TaskAttemptContextImpl.java    |  90 ---
 .../TaskInputOutputContextImpl.java             |  95 ---
 .../input/ShuffledMergedInputLegacy.java        |  29 +
 .../apache/tez/mapreduce/input/SimpleInput.java | 359 +++++----
 .../tez/mapreduce/input/SimpleInputLegacy.java  |  36 +
 .../tez/mapreduce/newcombine/MRCombiner.java    | 242 ------
 .../newinput/ShuffledMergedInputLegacy.java     |  29 -
 .../tez/mapreduce/newinput/SimpleInput.java     | 438 -----------
 .../mapreduce/newinput/SimpleInputLegacy.java   |  36 -
 .../tez/mapreduce/newoutput/SimpleOutput.java   | 326 ---------
 .../mapreduce/newpartition/MRPartitioner.java   |  88 ---
 .../FileSystemStatisticsUpdater.java            |  84 ---
 .../mapreduce/newprocessor/GcTimeUpdater.java   |  71 --
 .../tez/mapreduce/newprocessor/MRTask.java      | 731 -------------------
 .../mapreduce/newprocessor/MRTaskReporter.java  | 122 ----
 .../newprocessor/map/MapProcessor.java          | 341 ---------
 .../newprocessor/reduce/ReduceProcessor.java    | 353 ---------
 .../tez/mapreduce/output/SimpleOutput.java      | 355 ++++++---
 .../tez/mapreduce/partition/MRPartitioner.java  | 105 ++-
 .../apache/tez/mapreduce/processor/MRTask.java  | 625 +++++++---------
 .../tez/mapreduce/processor/MRTaskReporter.java |  77 +-
 .../processor/TezTaskReporterImpl.java          | 279 -------
 .../mapreduce/processor/map/MapProcessor.java   | 268 +++----
 .../processor/reduce/ReduceProcessor.java       | 336 ++++-----
 .../tez/mapreduce/task/MRRuntimeTask.java       | 272 -------
 .../tez/mapreduce/processor/MapUtils.java       |   2 +-
 .../processor/map/TestMapProcessor.java         |   2 +-
 .../processor/reduce/TestReduceProcessor.java   |   8 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   6 +-
 46 files changed, 1439 insertions(+), 5667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 2d3e4d1..31898a3 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -85,8 +85,8 @@ import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.newapi.impl.TezUmbilical;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
+import org.apache.tez.mapreduce.input.SimpleInputLegacy;
+import org.apache.tez.mapreduce.output.SimpleOutput;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 3e79949..429d458 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -85,9 +85,9 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.newinput.ShuffledMergedInputLegacy;
-import org.apache.tez.mapreduce.newprocessor.map.MapProcessor;
-import org.apache.tez.mapreduce.newprocessor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 
 import com.google.common.annotations.VisibleForTesting;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index adfc8cd..07fe58a 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -75,9 +75,9 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.newinput.ShuffledMergedInputLegacy;
-import org.apache.tez.mapreduce.newprocessor.map.MapProcessor;
-import org.apache.tez.mapreduce.newprocessor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 
 /**
  * An MRR job built on top of word count to return words sorted by

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index eb20876..aca5b8e 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -79,9 +79,9 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.newinput.ShuffledMergedInputLegacy;
-import org.apache.tez.mapreduce.newprocessor.map.MapProcessor;
-import org.apache.tez.mapreduce.newprocessor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 1362396..ac6d5dd 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -84,10 +84,10 @@ import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
 import org.apache.tez.engine.runtime.RuntimeUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-import org.apache.tez.mapreduce.newinput.SimpleInput;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-import org.apache.tez.mapreduce.newprocessor.map.MapProcessor;
-import org.apache.tez.mapreduce.newprocessor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
index dc7c53d..56e88c7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
@@ -25,324 +25,218 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
+import org.apache.tez.engine.common.ValuesIterator;
+import org.apache.tez.engine.common.combine.Combiner;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.mapreduce.hadoop.IDConverter;
-import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
-public class MRCombiner implements Processor {
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MRCombiner implements Combiner {
 
   private static Log LOG = LogFactory.getLog(MRCombiner.class);
-
-  JobConf jobConf;
-  boolean useNewApi;
-
-  private final MRTask task;
-
-  private Counter combinerInputKeyCounter;
-  private Counter combinerInputValueCounter;
-  private Progress combinePhase;
-
-  public MRCombiner(MRTask task) {
-    this.task = task;
-  }
-
-  @Override
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-    if (conf instanceof JobConf) {
-      jobConf = (JobConf)conf;
+  
+  private final Configuration conf;
+  private final Class<?> keyClass;
+  private final Class<?> valClass;
+  private final RawComparator<?> comparator;
+  private final boolean useNewApi;
+  
+  private final TezCounter combineInputKeyCounter;
+  private final TezCounter combineInputValueCounter;
+  
+  private final MRTaskReporter reporter;
+  private final TaskAttemptID mrTaskAttemptID;
+
+  public MRCombiner(TezTaskContext taskContext) throws IOException {
+    this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+
+    assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext);
+    if (taskContext instanceof TezOutputContext) {
+      this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
+      this.valClass = ConfigUtils.getIntermediateOutputValueClass(conf);
+      this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
+      this.reporter = new MRTaskReporter((TezOutputContext)taskContext);
     } else {
-      jobConf = new JobConf(conf);
+      this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+      this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+      this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+      this.reporter = new MRTaskReporter((TezInputContext)taskContext);
     }
-    useNewApi = jobConf.getUseNewMapper();
+
+    this.useNewApi = ConfigUtils.useNewApi(conf);
+    
+    combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+    
+    boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
+    this.mrTaskAttemptID = new TaskAttemptID(
+        new TaskID(String.valueOf(taskContext.getApplicationId()
+            .getClusterTimestamp()), taskContext.getApplicationId().getId(),
+            isMap ? TaskType.MAP : TaskType.REDUCE,
+            taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
+    
+    LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
   }
 
   @Override
-  public void process(Input[] in, Output[] out) throws IOException,
-      InterruptedException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Running MRCombiner, usingNewAPI=" + useNewApi);
-    }
-
-    CombineInput input = (CombineInput)in[0];
-    CombineOutput output = (CombineOutput)out[0];
-
-    combinePhase  = task.getProgress().addPhase("combine");
-
-    Class<?> keyClass = ConfigUtils.getIntermediateOutputKeyClass(jobConf);
-    Class<?> valueClass = ConfigUtils.getIntermediateOutputValueClass(jobConf);
-    LOG.info("Using combineKeyClass: " + keyClass);
-    LOG.info("Using combineValueClass: " + valueClass);
-    RawComparator<?> comparator =
-        ConfigUtils.getIntermediateOutputKeyComparator(jobConf);
-    LOG.info("Using combineComparator: " + comparator);
-
-    combinerInputKeyCounter =
-        task.getMRReporter().getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    combinerInputValueCounter =
-        task.getMRReporter().getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
-
+  public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+      throws InterruptedException, IOException {
     if (useNewApi) {
-      try {
-        runNewCombiner(this.jobConf,
-            task.getUmbilical(),
-            task.getMRReporter(),
-            input, comparator, keyClass, valueClass, output);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
-      }
+      runNewCombiner(rawIter, writer);
     } else {
-      runOldCombiner(this.jobConf,
-          task.getUmbilical(),
-          task.getMRReporter(),
-          input,
-          comparator, keyClass, valueClass,
-          output);
+      runOldCombiner(rawIter, writer);
     }
+    
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void runOldCombiner(JobConf job,
-        TezTaskUmbilicalProtocol umbilical,
-        final MRTaskReporter reporter,
-        CombineInput input,
-        RawComparator comparator,
-        Class keyClass,
-        Class valueClass,
-        final Output output) throws IOException, InterruptedException {
-
-    Reducer combiner =
-        ReflectionUtils.newInstance(job.getCombinerClass(), job);
-
-    // make output collector
-
-    OutputCollector collector =
-        new OutputCollector() {
-      public void collect(Object key, Object value)
-          throws IOException {
-        try {
-          output.write(key, value);
-        } catch (InterruptedException ie) {
-          throw new IOException(ie);
-        }
+  ///////////////// Methods for old API //////////////////////
+  
+  private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
+    Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);
+    
+    Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);
+    
+    OutputCollector collector = new OutputCollector() {
+      @Override
+      public void collect(Object key, Object value) throws IOException {
+        writer.append(key, value);
       }
     };
-
-    // apply combiner function
-    CombinerValuesIterator values =
-        new CombinerValuesIterator(input,
-            comparator, keyClass, valueClass, job, reporter,
-            combinerInputValueCounter, combinePhase);
-
-    values.informReduceProgress();
-    while (values.more()) {
-      combinerInputKeyCounter.increment(1);
-      combiner.reduce(values.getKey(), values, collector, reporter);
-      values.nextKey();
-      values.informReduceProgress();
+    
+    CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);
+    
+    while (values.moveToNext()) {
+      combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
     }
   }
-
-  private static final class CombinerValuesIterator<KEY,VALUE>
-  extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
-    private Counter combineInputValueCounter;
-    private Progress combinePhase;
-
-    public CombinerValuesIterator (CombineInput in,
-        RawComparator<KEY> comparator,
-        Class<KEY> keyClass,
-        Class<VALUE> valClass,
-        Configuration conf, Progressable reporter,
-        Counter combineInputValueCounter,
-        Progress combinePhase)
-            throws IOException {
-      super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
-      this.combineInputValueCounter = combineInputValueCounter;
-      this.combinePhase = combinePhase;
-    }
-
-    @Override
-    public VALUE next() {
-      combineInputValueCounter.increment(1);
-      return moveToNext();
-    }
-
-    protected VALUE moveToNext() {
-      return super.next();
-    }
-
-    public void informReduceProgress() {
-      // FIXME implement correct progress updates for combiner TEZ-184
-      // combinePhase.set(super.in.getProgress().getProgress()); // update progress
-      reporter.progress();
+  
+  private final class CombinerValuesIterator<KEY,VALUE> extends ValuesIterator<KEY, VALUE> {
+    public CombinerValuesIterator(TezRawKeyValueIterator rawIter,
+        Class<KEY> keyClass, Class<VALUE> valClass,
+        RawComparator<KEY> comparator) throws IOException {
+      super(rawIter, comparator, keyClass, valClass, conf,
+          combineInputKeyCounter, combineInputValueCounter);
     }
   }
-
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void runNewCombiner(JobConf job,
-      final TezTaskUmbilicalProtocol umbilical,
-      final MRTaskReporter reporter,
-      CombineInput input,
-      RawComparator comparator,
-      Class keyClass,
-      Class valueClass,
-      final Output out
-      ) throws IOException,InterruptedException,
-      ClassNotFoundException {
-    // wrap value iterator to report progress.
-    final TezRawKeyValueIterator rawIter = input.getIterator();
-    TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
-      public void close() throws IOException {
-        rawIter.close();
-      }
-      public DataInputBuffer getKey() throws IOException {
-        return rawIter.getKey();
+  
+  ///////////////// End of methods for old API //////////////////////
+  
+  ///////////////// Methods for new API //////////////////////
+  
+  private void runNewCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws InterruptedException, IOException {
+    
+    RecordWriter recordWriter = new RecordWriter() {
+
+      @Override
+      public void write(Object key, Object value) throws IOException,
+          InterruptedException {
+        writer.append(key, value);
       }
-      public Progress getProgress() {
-        return rawIter.getProgress();
-      }
-      public DataInputBuffer getValue() throws IOException {
-        return rawIter.getValue();
-      }
-      public boolean next() throws IOException {
-        boolean ret = rawIter.next();
-        // FIXME progress updates for combiner
-        // reporter.setProgress(rawIter.getProgress().getProgress());
-        return ret;
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+          InterruptedException {
+        // Will be closed by whoever invokes the combiner.
       }
     };
-
-    // make a task context so we can get the classes
-    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        new TaskAttemptContextImpl(job, task.getTaskAttemptId(), reporter);
-
-    // make a reducer
-    org.apache.hadoop.mapreduce.Reducer reducer =
-        (org.apache.hadoop.mapreduce.Reducer)
-        ReflectionUtils.newInstance(taskContext.getCombinerClass(), job);
-
-    org.apache.hadoop.mapreduce.RecordWriter trackedRW =
-        new org.apache.hadoop.mapreduce.RecordWriter() {
-
-          @Override
-          public void write(Object key, Object value) throws IOException,
-              InterruptedException {
-            out.write(key, value);
-          }
-
-          @Override
-          public void close(TaskAttemptContext context) throws IOException,
-              InterruptedException {
-            // Should not close this here as the sorter will close the
-            // combine output
-          }
-        };
-
+    
+    Class<? extends org.apache.hadoop.mapreduce.Reducer> reducerClazz = (Class<? extends org.apache.hadoop.mapreduce.Reducer>) conf
+        .getClass(MRJobConfig.COMBINE_CLASS_ATTR, null,
+            org.apache.hadoop.mapreduce.Reducer.class);
+    org.apache.hadoop.mapreduce.Reducer reducer = ReflectionUtils.newInstance(reducerClazz, conf);
+    
     org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
         createReduceContext(
-            reducer, job, task.getTaskAttemptId(),
-            rIter, combinerInputKeyCounter,
-            combinerInputValueCounter,
-            trackedRW,
-            null,
-            reporter, comparator, keyClass,
-            valueClass);
+            conf,
+            mrTaskAttemptID,
+            rawIter,
+            new MRCounters.MRCounter(combineInputKeyCounter),
+            new MRCounters.MRCounter(combineInputValueCounter),
+            recordWriter,
+            reporter,
+            (RawComparator)comparator,
+            keyClass,
+            valClass);
+    
     reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
-  }
-
-  @Override
-  public void close() throws IOException, InterruptedException {
+    recordWriter.close(reducerContext);
   }
 
-  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
-  createReduceContext(org.apache.hadoop.mapreduce.Reducer
-                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
-                      Configuration job,
-                      TezTaskAttemptID taskId,
-                      final TezRawKeyValueIterator rIter,
-                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
-                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
-                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
-                      org.apache.hadoop.mapreduce.OutputCommitter committer,
-                      org.apache.hadoop.mapreduce.StatusReporter reporter,
-                      RawComparator<INKEY> comparator,
-                      Class<INKEY> keyClass, Class<INVALUE> valueClass
-  ) throws IOException, InterruptedException {
-    RawKeyValueIterator r =
-        new RawKeyValueIterator() {
-
-          @Override
-          public boolean next() throws IOException {
-            return rIter.next();
-          }
+  private static <KEYIN, VALUEIN, KEYOUT, VALUEOUT> org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
+      Configuration conf,
+      TaskAttemptID mrTaskAttemptID,
+      final TezRawKeyValueIterator rawIter,
+      Counter combineInputKeyCounter,
+      Counter combineInputValueCounter,
+      RecordWriter<KEYOUT, VALUEOUT> recordWriter,
+      MRTaskReporter reporter,
+      RawComparator<KEYIN> comparator,
+      Class<KEYIN> keyClass,
+      Class<VALUEIN> valClass) throws InterruptedException, IOException {
+
+    RawKeyValueIterator r = new RawKeyValueIterator() {
+
+      @Override
+      public boolean next() throws IOException {
+        return rawIter.next();
+      }
 
-          @Override
-          public DataInputBuffer getValue() throws IOException {
-            return rIter.getValue();
-          }
+      @Override
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
 
-          @Override
-          public Progress getProgress() {
-            return rIter.getProgress();
-          }
+      @Override
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
 
-          @Override
-          public DataInputBuffer getKey() throws IOException {
-            return rIter.getKey();
-          }
+      @Override
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
 
-          @Override
-          public void close() throws IOException {
-            rIter.close();
-          }
-        };
-    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
-    reduceContext =
-      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
-          job,
-          IDConverter.toMRTaskAttemptId(taskId),
-          r,
-          inputKeyCounter,
-          inputValueCounter,
-          output,
-          committer,
-          reporter,
-          comparator,
-          keyClass,
-          valueClass);
+      @Override
+      public void close() throws IOException {
+        rawIter.close();
+      }
+    };
 
-    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
-        reducerContext = new
-          WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
-              reduceContext);
+    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
+        conf, mrTaskAttemptID, r, combineInputKeyCounter,
+        combineInputValueCounter, recordWriter, null, reporter, comparator,
+        keyClass, valClass);
 
+    org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
+        .getReducerContext(rContext);
     return reducerContext;
   }
 
+  
+ 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 7a9b7e0..b0ed6ab 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -59,8 +59,8 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.apache.tez.mapreduce.newcombine.MRCombiner;
-import org.apache.tez.mapreduce.newpartition.MRPartitioner;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 
 
 public class MRHelpers {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index d768312..d888c42 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
-import org.apache.tez.mapreduce.newcombine.MRCombiner;
-import org.apache.tez.mapreduce.newpartition.MRPartitioner;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 
 import com.google.common.base.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
new file mode 100644
index 0000000..f5e08dc
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
@@ -0,0 +1,76 @@
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.mapreduce.common.Utils;
+
+public class MRReporter implements Reporter {
+
+  private TezTaskContext tezTaskContext;
+  private InputSplit split;
+  private boolean isProcessorContext = false;
+  
+  public MRReporter(TezProcessorContext tezProcContext) {
+    this(tezProcContext, null);
+    isProcessorContext = true;
+  }
+  public MRReporter(TezTaskContext tezTaskContext) {
+    this(tezTaskContext, null);
+  }
+
+  public MRReporter(TezTaskContext tezTaskContext, InputSplit split) {
+    this.tezTaskContext = tezTaskContext;
+    this.split = split;
+  }
+  
+  @Override
+  public void progress() {
+    //TODO NEWTEZ
+  }
+
+  @Override
+  public void setStatus(String status) {
+    // Not setting status string in Tez.
+
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> name) {
+    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(name));
+  }
+
+  @Override
+  public Counter getCounter(String group, String name) {
+    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(group,
+        name));
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    getCounter(key).increment(amount);
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount) {
+    getCounter(group, counter).increment(amount);
+  }
+
+  @Override
+  public InputSplit getInputSplit() throws UnsupportedOperationException {
+    if (split == null) {
+      throw new UnsupportedOperationException("Input only available on map");
+    } else {
+      return split;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    // TOOD NEWTEZ Does this make a difference to anything ?
+    return 0.0f;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
index d953891..fc32825 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.Progressable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
 @InterfaceAudience.Private
@@ -36,24 +33,10 @@ public class TaskAttemptContextImpl
        implements TaskAttemptContext {
   private MRTaskReporter reporter;
 
-  public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskid) {
-    this(conf, taskid, null);
-  }
-  
   // FIXME we need to use DAG Id but we are using App Id
-  public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskAttemptId,
+  public TaskAttemptContextImpl(JobConf conf, TaskAttemptID taskAttemptId,
                          MRTaskReporter reporter) {
-    super(conf, 
-        new TaskAttemptID(
-            new TaskID(
-                Long.toString(taskAttemptId.getTaskID().getVertexID().
-                    getDAGId().getApplicationId().getClusterTimestamp()),
-                taskAttemptId.getTaskID().getVertexID().getDAGId().
-                    getApplicationId().getId(),
-                (taskAttemptId.getTaskID().getVertexID().getId() == 0 ?
-                    TaskType.MAP : TaskType.REDUCE),
-                taskAttemptId.getTaskID().getId()),
-              taskAttemptId.getId()));
+    super(conf, taskAttemptId);
     this.reporter = reporter;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
index 4552397..b0348c9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.engine.newapi.TezTaskContext;
 
 /**
  * The context that is given to the {@link Mapper}.
@@ -51,9 +51,9 @@ public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
                         RecordReader<KEYIN,VALUEIN> reader,
                         RecordWriter<KEYOUT,VALUEOUT> writer,
                         OutputCommitter committer,
-                        MRTaskReporter reporter,
+                        TezTaskContext context,
                         InputSplit split) {
-    super(conf, taskid, writer, committer, reporter);
+    super(conf, taskid, writer, committer, context);
     this.reader = reader;
     this.split = split;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
deleted file mode 100644
index e775b7e..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.hadoop.mapreduce;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.BackupStore;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.ReduceContext;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
-
-/**
- * The context passed to the {@link Reducer}.
- * @param <KEYIN> the class of the input keys
- * @param <VALUEIN> the class of the input values
- * @param <KEYOUT> the class of the output keys
- * @param <VALUEOUT> the class of the output values
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
-    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-    implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-  private RawKeyValueIterator input;
-  private Counter inputValueCounter;
-  private Counter inputKeyCounter;
-  private RawComparator<KEYIN> comparator;
-  private KEYIN key;                                  // current key
-  private VALUEIN value;                              // current value
-  private boolean firstValue = false;                 // first value in key
-  private boolean nextKeyIsSame = false;              // more w/ this key
-  private boolean hasMore;                            // more in file
-  protected Progressable reporter;
-  private Deserializer<KEYIN> keyDeserializer;
-  private Deserializer<VALUEIN> valueDeserializer;
-  private DataInputBuffer buffer = new DataInputBuffer();
-  private BytesWritable currentRawKey = new BytesWritable();
-  private ValueIterable iterable = new ValueIterable();
-  private boolean isMarked = false;
-  private BackupStore<KEYIN,VALUEIN> backupStore;
-  private final SerializationFactory serializationFactory;
-  private final Class<KEYIN> keyClass;
-  private final Class<VALUEIN> valueClass;
-  private final Configuration conf;
-  private final TaskAttemptID taskid;
-  private int currentKeyLength = -1;
-  private int currentValueLength = -1;
-  
-  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
-                           RawKeyValueIterator input, 
-                           Counter inputKeyCounter,
-                           Counter inputValueCounter,
-                           RecordWriter<KEYOUT,VALUEOUT> output,
-                           OutputCommitter committer,
-                           MRTaskReporter reporter,
-                           RawComparator<KEYIN> comparator,
-                           Class<KEYIN> keyClass,
-                           Class<VALUEIN> valueClass
-                          ) throws InterruptedException, IOException{
-    super(conf, taskid, output, committer, reporter);
-    this.input = input;
-    this.inputKeyCounter = inputKeyCounter;
-    this.inputValueCounter = inputValueCounter;
-    this.comparator = comparator;
-    this.serializationFactory = new SerializationFactory(conf);
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(buffer);
-    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
-    this.valueDeserializer.open(buffer);
-    hasMore = input.next();
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-    this.conf = conf;
-    this.taskid = taskid;
-  }
-
-  /** Start processing next unique key. */
-  public boolean nextKey() throws IOException,InterruptedException {
-    while (hasMore && nextKeyIsSame) {
-      nextKeyValue();
-    }
-    if (hasMore) {
-      if (inputKeyCounter != null) {
-        inputKeyCounter.increment(1);
-      }
-      return nextKeyValue();
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Advance to the next key/value pair.
-   */
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (!hasMore) {
-      key = null;
-      value = null;
-      return false;
-    }
-    firstValue = !nextKeyIsSame;
-    DataInputBuffer nextKey = input.getKey();
-    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
-                      nextKey.getLength() - nextKey.getPosition());
-    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
-    key = keyDeserializer.deserialize(key);
-    DataInputBuffer nextVal = input.getValue();
-    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
-    value = valueDeserializer.deserialize(value);
-
-    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
-    currentValueLength = nextVal.getLength() - nextVal.getPosition();
-
-    if (isMarked) {
-      backupStore.write(nextKey, nextVal);
-    }
-
-    hasMore = input.next();
-    if (hasMore) {
-      nextKey = input.getKey();
-      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
-                                     currentRawKey.getLength(),
-                                     nextKey.getData(),
-                                     nextKey.getPosition(),
-                                     nextKey.getLength() - nextKey.getPosition()
-                                         ) == 0;
-    } else {
-      nextKeyIsSame = false;
-    }
-    inputValueCounter.increment(1);
-    return true;
-  }
-
-  public KEYIN getCurrentKey() {
-    return key;
-  }
-
-  @Override
-  public VALUEIN getCurrentValue() {
-    return value;
-  }
-  
-  BackupStore<KEYIN,VALUEIN> getBackupStore() {
-    return backupStore;
-  }
-  
-  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
-
-    private boolean inReset = false;
-    private boolean clearMarkFlag = false;
-
-    public boolean hasNext() {
-      try {
-        if (inReset && backupStore.hasNext()) {
-          return true;
-        } 
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw new RuntimeException("hasNext failed", e);
-      }
-      return firstValue || nextKeyIsSame;
-    }
-
-    public VALUEIN next() {
-      if (inReset) {
-        try {
-          if (backupStore.hasNext()) {
-            backupStore.next();
-            DataInputBuffer next = backupStore.nextValue();
-            buffer.reset(next.getData(), next.getPosition(), next.getLength());
-            value = valueDeserializer.deserialize(value);
-            return value;
-          } else {
-            inReset = false;
-            backupStore.exitResetMode();
-            if (clearMarkFlag) {
-              clearMarkFlag = false;
-              isMarked = false;
-            }
-          }
-        } catch (IOException e) {
-          e.printStackTrace();
-          throw new RuntimeException("next value iterator failed", e);
-        }
-      } 
-
-      // if this is the first record, we don't need to advance
-      if (firstValue) {
-        firstValue = false;
-        return value;
-      }
-      // if this isn't the first record and the next key is different, they
-      // can't advance it here.
-      if (!nextKeyIsSame) {
-        throw new NoSuchElementException("iterate past last value");
-      }
-      // otherwise, go to the next key/value pair
-      try {
-        nextKeyValue();
-        return value;
-      } catch (IOException ie) {
-        throw new RuntimeException("next value iterator failed", ie);
-      } catch (InterruptedException ie) {
-        // this is bad, but we can't modify the exception list of java.util
-        throw new RuntimeException("next value iterator interrupted", ie);        
-      }
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException("remove not implemented");
-    }
-
-    public void mark() throws IOException {
-      if (getBackupStore() == null) {
-        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
-      }
-      isMarked = true;
-      if (!inReset) {
-        backupStore.reinitialize();
-        if (currentKeyLength == -1) {
-          // The user has not called next() for this iterator yet, so
-          // there is no current record to mark and copy to backup store.
-          return;
-        }
-        assert (currentValueLength != -1);
-        int requestedSize = currentKeyLength + currentValueLength + 
-          WritableUtils.getVIntSize(currentKeyLength) +
-          WritableUtils.getVIntSize(currentValueLength);
-        DataOutputStream out = backupStore.getOutputStream(requestedSize);
-        writeFirstKeyValueBytes(out);
-        backupStore.updateCounters(requestedSize);
-      } else {
-        backupStore.mark();
-      }
-    }
-
-    public void reset() throws IOException {
-      // We reached the end of an iteration and user calls a 
-      // reset, but a clearMark was called before, just throw
-      // an exception
-      if (clearMarkFlag) {
-        clearMarkFlag = false;
-        backupStore.clearMark();
-        throw new IOException("Reset called without a previous mark");
-      }
-      
-      if (!isMarked) {
-        throw new IOException("Reset called without a previous mark");
-      }
-      inReset = true;
-      backupStore.reset();
-    }
-
-    public void clearMark() throws IOException {
-      if (getBackupStore() == null) {
-        return;
-      }
-      if (inReset) {
-        clearMarkFlag = true;
-        backupStore.clearMark();
-      } else {
-        inReset = isMarked = false;
-        backupStore.reinitialize();
-      }
-    }
-    
-    /**
-     * This method is called when the reducer moves from one key to 
-     * another.
-     * @throws IOException
-     */
-    public void resetBackupStore() throws IOException {
-      if (getBackupStore() == null) {
-        return;
-      }
-      inReset = isMarked = false;
-      backupStore.reinitialize();
-      currentKeyLength = -1;
-    }
-
-    /**
-     * This method is called to write the record that was most recently
-     * served (before a call to the mark). Since the framework reads one
-     * record in advance, to get this record, we serialize the current key
-     * and value
-     * @param out
-     * @throws IOException
-     */
-    private void writeFirstKeyValueBytes(DataOutputStream out) 
-    throws IOException {
-      assert (getCurrentKey() != null && getCurrentValue() != null);
-      WritableUtils.writeVInt(out, currentKeyLength);
-      WritableUtils.writeVInt(out, currentValueLength);
-      Serializer<KEYIN> keySerializer = 
-        serializationFactory.getSerializer(keyClass);
-      keySerializer.open(out);
-      keySerializer.serialize(getCurrentKey());
-
-      Serializer<VALUEIN> valueSerializer = 
-        serializationFactory.getSerializer(valueClass);
-      valueSerializer.open(out);
-      valueSerializer.serialize(getCurrentValue());
-    }
-  }
-
-  protected class ValueIterable implements Iterable<VALUEIN> {
-    private ValueIterator iterator = new ValueIterator();
-    public Iterator<VALUEIN> iterator() {
-      return iterator;
-    } 
-  }
-  
-  /**
-   * Iterate through the values for the current key, reusing the same value 
-   * object, which is stored in the context.
-   * @return the series of values associated with the current key. All of the 
-   * objects returned directly and indirectly from this method are reused.
-   */
-  public 
-  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
-    return iterable;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
index 7b69872..d8548a4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
@@ -1,5 +1,4 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
+/* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -21,98 +20,71 @@ package org.apache.tez.mapreduce.hadoop.mapreduce;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.tez.mapreduce.hadoop.IDConverter;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.mapreduce.common.Utils;
+
+// NOTE: NEWTEZ: This is a copy of org.apache.tez.mapreduce.hadoop.mapred (not mapreduce). mapred likely does not need it's own copy of this class.
+// Meant for use by the "mapreduce" API
 
-/**
- * The context for task attempts.
- */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class TaskAttemptContextImpl extends JobContextImpl 
-    implements TaskAttemptContext {
-  private final TaskAttemptID taskId;
-  private String status = "";
-  private MRTaskReporter reporter;
+public class TaskAttemptContextImpl
+       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
 
-  public TaskAttemptContextImpl(Configuration conf, 
-                                TaskAttemptID taskId) {
-    this(conf, taskId, null);
-  }
+  private TezTaskContext taskContext;
 
+  // FIXME we need to use DAG Id but we are using App Id
   public TaskAttemptContextImpl(Configuration conf,
-      TaskAttemptID taskId, MRTaskReporter reporter) {
-    super(conf, IDConverter.fromMRJobId(taskId.getJobID()));
-    this.taskId = taskId;
-    this.reporter = reporter;
+      TezTaskContext taskContext, boolean isMap) {
+    // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
+    this(conf, new TaskAttemptID(
+        new TaskID(String.valueOf(taskContext.getApplicationId()
+            .getClusterTimestamp()), taskContext.getApplicationId().getId(),
+            isMap ? TaskType.MAP : TaskType.REDUCE,
+            taskContext.getTaskIndex()),
+            taskContext.getTaskAttemptNumber()), taskContext);
   }
 
-  /**
-   * Get the unique name for this task attempt.
-   */
-  public TaskAttemptID getTaskAttemptID() {
-    return taskId;
+  public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context) {
+    super(conf, taId);
+    this.taskContext = context;
   }
 
-  /**
-   * Get the last set status message.
-   * @return the current status message
-   */
-  public String getStatus() {
-    return status;
+  @Override
+  public float getProgress() {
+    // TODO NEWTEZ Will this break anything ?
+    return 0.0f;
   }
 
+  @Override
   public Counter getCounter(Enum<?> counterName) {
-    return (Counter) reporter.getCounter(counterName);
+    return Utils.getMRCounter(taskContext.getCounters().findCounter(counterName));
   }
 
+  @Override
   public Counter getCounter(String groupName, String counterName) {
-    return (Counter) reporter.getCounter(groupName, counterName);
+    return Utils.getMRCounter(taskContext.getCounters().findCounter(groupName, counterName));
   }
 
   /**
    * Report progress.
    */
+  @Override
   public void progress() {
-    reporter.progress();
-  }
-
-  protected void setStatusString(String status) {
-    this.status = status;
+    // Nothing to do.
   }
 
   /**
    * Set the current status of the task to the given string.
    */
+  @Override
   public void setStatus(String status) {
-    String normalizedStatus = Task.normalizeStatus(status, conf);
-    setStatusString(normalizedStatus);
-    reporter.setStatus(normalizedStatus);
-  }
-
-  public static class DummyReporter extends StatusReporter {
-    public void setStatus(String s) {
-    }
-    public void progress() {
-    }
-    public Counter getCounter(Enum<?> name) {
-      return new Counters().findCounter(name);
-    }
-    public Counter getCounter(String group, String name) {
-      return new Counters().findCounter(group, name);
-    }
-    public float getProgress() {
-      return 0f;
-    }
-  }
-  
-  public float getProgress() {
-    return reporter.getProgress();
+    setStatusString(status);
+    // Nothing to do until InputContext supports some kind of custom string
+    // diagnostics.
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
index ff4e18f..c2920dc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.engine.newapi.TezTaskContext;
 
 /**
  * A context object that allows input and output from the task. It is only
@@ -50,8 +50,8 @@ public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
   public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
                                     RecordWriter<KEYOUT,VALUEOUT> output,
                                     OutputCommitter committer,
-                                    MRTaskReporter reporter) {
-    super(conf, taskid, reporter);
+                                    TezTaskContext context) {
+    super(conf, taskid, context);
     this.output = output;
     this.committer = committer;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
deleted file mode 100644
index df23b9f..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.tez.mapreduce.hadoop.newmapred;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.mapreduce.common.Utils;
-
-public class MRReporter implements Reporter {
-
-  private TezTaskContext tezTaskContext;
-  private InputSplit split;
-  private boolean isProcessorContext = false;
-  
-  public MRReporter(TezProcessorContext tezProcContext) {
-    this(tezProcContext, null);
-    isProcessorContext = true;
-  }
-  public MRReporter(TezTaskContext tezTaskContext) {
-    this(tezTaskContext, null);
-  }
-
-  public MRReporter(TezTaskContext tezTaskContext, InputSplit split) {
-    this.tezTaskContext = tezTaskContext;
-    this.split = split;
-  }
-  
-  @Override
-  public void progress() {
-    //TODO NEWTEZ
-  }
-
-  @Override
-  public void setStatus(String status) {
-    // Not setting status string in Tez.
-
-  }
-
-  @Override
-  public Counter getCounter(Enum<?> name) {
-    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(name));
-  }
-
-  @Override
-  public Counter getCounter(String group, String name) {
-    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(group,
-        name));
-  }
-
-  @Override
-  public void incrCounter(Enum<?> key, long amount) {
-    getCounter(key).increment(amount);
-  }
-
-  @Override
-  public void incrCounter(String group, String counter, long amount) {
-    getCounter(group, counter).increment(amount);
-  }
-
-  @Override
-  public InputSplit getInputSplit() throws UnsupportedOperationException {
-    if (split == null) {
-      throw new UnsupportedOperationException("Input only available on map");
-    } else {
-      return split;
-    }
-  }
-
-  @Override
-  public float getProgress() {
-    // TOOD NEWTEZ Does this make a difference to anything ?
-    return 0.0f;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/TaskAttemptContextImpl.java
deleted file mode 100644
index 956fcc2..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/TaskAttemptContextImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.hadoop.newmapred;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TaskAttemptContextImpl
-       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
-       implements TaskAttemptContext {
-  private MRTaskReporter reporter;
-
-  // FIXME we need to use DAG Id but we are using App Id
-  public TaskAttemptContextImpl(JobConf conf, TaskAttemptID taskAttemptId,
-                         MRTaskReporter reporter) {
-    super(conf, taskAttemptId);
-    this.reporter = reporter;
-  }
-  
-  /**
-   * Get the taskAttemptID.
-   *  
-   * @return TaskAttemptID
-   */
-  public TaskAttemptID getTaskAttemptID() {
-    return (TaskAttemptID) super.getTaskAttemptID();
-  }
-  
-  public Progressable getProgressible() {
-    return reporter;
-  }
-  
-  public JobConf getJobConf() {
-    return (JobConf) getConfiguration();
-  }
-  
-  @Override
-  public float getProgress() {
-    return reporter.getProgress();
-  }
-
-  @Override
-  public Counter getCounter(Enum<?> counterName) {
-    return (Counter) reporter.getCounter(counterName);
-  }
-
-  @Override
-  public Counter getCounter(String groupName, String counterName) {
-    return (Counter) reporter.getCounter(groupName, counterName);
-  }
-
-  /**
-   * Report progress.
-   */
-  @Override
-  public void progress() {
-    reporter.progress();
-  }
-
-  /**
-   * Set the current status of the task to the given string.
-   */
-  @Override
-  public void setStatus(String status) {
-    setStatusString(status);
-    reporter.setStatus(status);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/MapContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/MapContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/MapContextImpl.java
deleted file mode 100644
index 39c7194..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/MapContextImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.hadoop.newmapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.tez.engine.newapi.TezTaskContext;
-
-/**
- * The context that is given to the {@link Mapper}.
- * @param <KEYIN> the key input type to the Mapper
- * @param <VALUEIN> the value input type to the Mapper
- * @param <KEYOUT> the key output type from the Mapper
- * @param <VALUEOUT> the value output type from the Mapper
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-    implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-  private RecordReader<KEYIN,VALUEIN> reader;
-  private InputSplit split;
-
-  public MapContextImpl(Configuration conf, TaskAttemptID taskid,
-                        RecordReader<KEYIN,VALUEIN> reader,
-                        RecordWriter<KEYOUT,VALUEOUT> writer,
-                        OutputCommitter committer,
-                        TezTaskContext context,
-                        InputSplit split) {
-    super(conf, taskid, writer, committer, context);
-    this.reader = reader;
-    this.split = split;
-  }
-
-  /**
-   * Get the input split for this map.
-   */
-  public InputSplit getInputSplit() {
-    return split;
-  }
-
-  @Override
-  public KEYIN getCurrentKey() throws IOException, InterruptedException {
-    return reader.getCurrentKey();
-  }
-
-  @Override
-  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
-    return reader.getCurrentValue();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return reader.nextKeyValue();
-  }
-
-}
-     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
deleted file mode 100644
index f2057e9..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.hadoop.newmapreduce;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.mapreduce.common.Utils;
-
-// NOTE: NEWTEZ: This is a copy of org.apache.tez.mapreduce.hadoop.mapred (not mapreduce). mapred likely does not need it's own copy of this class.
-// Meant for use by the "mapreduce" API
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TaskAttemptContextImpl
-       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
-
-  private TezTaskContext taskContext;
-
-  // FIXME we need to use DAG Id but we are using App Id
-  public TaskAttemptContextImpl(Configuration conf,
-      TezTaskContext taskContext, boolean isMap) {
-    // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
-    this(conf, new TaskAttemptID(
-        new TaskID(String.valueOf(taskContext.getApplicationId()
-            .getClusterTimestamp()), taskContext.getApplicationId().getId(),
-            isMap ? TaskType.MAP : TaskType.REDUCE,
-            taskContext.getTaskIndex()),
-            taskContext.getTaskAttemptNumber()), taskContext);
-  }
-
-  public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context) {
-    super(conf, taId);
-    this.taskContext = context;
-  }
-
-  @Override
-  public float getProgress() {
-    // TODO NEWTEZ Will this break anything ?
-    return 0.0f;
-  }
-
-  @Override
-  public Counter getCounter(Enum<?> counterName) {
-    return Utils.getMRCounter(taskContext.getCounters().findCounter(counterName));
-  }
-
-  @Override
-  public Counter getCounter(String groupName, String counterName) {
-    return Utils.getMRCounter(taskContext.getCounters().findCounter(groupName, counterName));
-  }
-
-  /**
-   * Report progress.
-   */
-  @Override
-  public void progress() {
-    // Nothing to do.
-  }
-
-  /**
-   * Set the current status of the task to the given string.
-   */
-  @Override
-  public void setStatus(String status) {
-    setStatusString(status);
-    // Nothing to do until InputContext supports some kind of custom string
-    // diagnostics.
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskInputOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskInputOutputContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskInputOutputContextImpl.java
deleted file mode 100644
index a34cf8d..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskInputOutputContextImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.hadoop.newmapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.tez.engine.newapi.TezTaskContext;
-
-/**
- * A context object that allows input and output from the task. It is only
- * supplied to the {@link Mapper} or {@link Reducer}.
- * @param <KEYIN> the input key type for the task
- * @param <VALUEIN> the input value type for the task
- * @param <KEYOUT> the output key type for the task
- * @param <VALUEOUT> the output value type for the task
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-       extends TaskAttemptContextImpl 
-       implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-  private RecordWriter<KEYOUT,VALUEOUT> output;
-  private OutputCommitter committer;
-
-  public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
-                                    RecordWriter<KEYOUT,VALUEOUT> output,
-                                    OutputCommitter committer,
-                                    TezTaskContext context) {
-    super(conf, taskid, context);
-    this.output = output;
-    this.committer = committer;
-  }
-
-  /**
-   * Advance to the next key, value pair, returning null if at end.
-   * @return the key object that was read into, or null if no more
-   */
-  public abstract 
-  boolean nextKeyValue() throws IOException, InterruptedException;
- 
-  /**
-   * Get the current key.
-   * @return the current key object or null if there isn't one
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract 
-  KEYIN getCurrentKey() throws IOException, InterruptedException;
-
-  /**
-   * Get the current value.
-   * @return the value object that was read into
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract VALUEIN getCurrentValue() throws IOException, 
-                                                   InterruptedException;
-
-  /**
-   * Generate an output key/value pair.
-   */
-  public void write(KEYOUT key, VALUEOUT value
-                    ) throws IOException, InterruptedException {
-    output.write(key, value);
-  }
-
-  public OutputCommitter getOutputCommitter() {
-    return committer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java
new file mode 100644
index 0000000..2d230d6
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java
@@ -0,0 +1,29 @@
+/**
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ * 
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ * 
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+
+public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
+
+  @Private
+  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
+    // wait for input so that iterator is available
+    waitForInputReady();
+    return rawIter;
+  }
+}


Mime
View raw message