tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-474. Fix TestMapProcessor and TestReduceProcessor unit tests (part of TEZ-398). (sseth)
Date Tue, 24 Sep 2013 06:24:00 GMT
Updated Branches:
  refs/heads/TEZ-398 d316f7235 -> c5a8a3c6e


TEZ-474. Fix TestMapProcessor and TestReduceProcessor unit tests (part
of TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c5a8a3c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c5a8a3c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c5a8a3c6

Branch: refs/heads/TEZ-398
Commit: c5a8a3c6ee72dcf5ec34e28fd87d2685e8b9bb1d
Parents: d316f72
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Sep 23 23:23:39 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Sep 23 23:23:39 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |   1 +
 .../apache/hadoop/mapred/YarnOutputFiles.java   | 236 ------------------
 .../tez/engine/lib/input/LocalMergedInput.java  |   8 +-
 .../engine/lib/input/ShuffledMergedInput.java   |   2 +-
 .../lib/input/ShuffledMergedInputLegacy.java    |  30 +++
 .../lib/output/LocalOnFileSorterOutput.java     |   7 +-
 .../engine/lib/output/OnFileSortedOutput.java   |   4 +
 .../LogicalIOProcessorRuntimeTask.java          |  46 ++++
 .../tez/mapreduce/examples/MRRSleepJob.java     |   2 +-
 .../mapreduce/examples/OrderedWordCount.java    |   2 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   2 +-
 .../input/ShuffledMergedInputLegacy.java        |  29 ---
 .../apache/tez/mapreduce/processor/MRTask.java  |   5 -
 .../processor/reduce/ReduceProcessor.java       |   2 +-
 .../mapreduce/task/impl/YarnOutputFiles.java    | 239 -------------------
 .../org/apache/tez/mapreduce/TestUmbilical.java |  62 +++++
 .../tez/mapreduce/TestUmbilicalProtocol.java    |  91 -------
 .../tez/mapreduce/processor/MapUtils.java       |  28 ++-
 .../processor/map/TestMapProcessor.java         |  31 ++-
 .../processor/reduce/TestReduceProcessor.java   |  59 +++--
 .../org/apache/tez/mapreduce/YARNRunner.java    |   2 +-
 21 files changed, 233 insertions(+), 655 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 7c4540c..2c4b911 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -64,6 +64,7 @@ public class TezJobConfig {
   /**
    * List of directories avialble to the engine. 
    */
+  @Private
   public static final String LOCAL_DIRS = "tez.engine.local.dirs";
   public static final String DEFAULT_LOCAL_DIRS = "/tmp";
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
deleted file mode 100644
index e43cf47..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MRConfig;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class YarnOutputFiles extends MapOutputFile {
-
-  private JobConf conf;
-
-  private static final String JOB_OUTPUT_DIR = "output";
-  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
-  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
-      + ".index";
-
-  public YarnOutputFiles() {
-  }
-
-  // assume configured to $localdir/usercache/$user/appcache/$appId
-  private LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(MRConfig.LOCAL_DIR);
-
-  private Path getAttemptOutputDir() {
-    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
-  }
-  
-  /**
-   * Return the path to local map output file created earlier
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFile() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output file name.
-   * 
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFileForWrite(long size) throws IOException {
-    Path attemptOutput = 
-      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
-  }
-
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir,
-        conf.get(JobContext.TASK_ATTEMPT_ID));
-    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING);
-  }
-
-  /**
-   * Return the path to a local map output index file created earlier
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFile() throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
-                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output index file name.
-   * 
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFileForWrite(long size) throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
-                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
-        size, conf);
-  }
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir,
-        conf.get(JobContext.TASK_ATTEMPT_ID));
-    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
-                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
-  }
-
-  /**
-   * Return a local map spill file created earlier.
-   * 
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill file name.
-   * 
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(String.format(SPILL_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
-  }
-
-  /**
-   * Return a local map spill index file created earlier
-   * 
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill index file name.
-   * 
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
-  }
-
-  /**
-   * Return a local reduce input file created earlier
-   * 
-   * @param mapId a map task id
-   * @return path
-   * @throws IOException 
-   */
-  public Path getInputFile(int mapId) throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-
-  /**
-   * Create a local reduce input file name.
-   * 
-   * @param mapId a map task id
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
-      long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING,
-        getAttemptOutputDir().toString(), mapId.getId()),
-        size, conf);
-  }
-
-  /** Removes all of the files related to a task. */
-  public void removeAll() throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
index ed57c61..6371787 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
@@ -31,11 +31,7 @@ import org.apache.tez.engine.common.localshuffle.LocalShuffle;
  * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
  * sorted data, merges them and provides key/<values> to the consumer. 
  */
-public class LocalMergedInput extends ShuffledMergedInput {
-
-
-  // TODO NEWTEZ Fix CombineProcessor
-  //private CombineInput raw;
+public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
@@ -43,8 +39,8 @@ public class LocalMergedInput extends ShuffledMergedInput {
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
     LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
-    // TODO NEWTEZ async run and checkIfComplete methods
     rawIter = localShuffle.run();
+    createValuesIterator();
     return Collections.emptyList();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index 3db0632..a984b0f 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -156,7 +156,7 @@ public class ShuffledMergedInput implements LogicalInput {
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void createValuesIterator()
+  protected void createValuesIterator()
       throws IOException {
     vIter = new ValuesIterator(rawIter,
         (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
new file mode 100644
index 0000000..f2da031
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
@@ -0,0 +1,30 @@
+/**
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ * 
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ * 
+ */
+
+package org.apache.tez.engine.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+@LimitedPrivate("mapreduce")
+public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
+
+  @Private
+  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
+    // wait for input so that iterator is available
+    waitForInputReady();
+    return rawIter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
index b24e10d..7fd26d7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
@@ -48,11 +48,16 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
             outputContext.getTaskIndex(),
             localFs.getFileStatus(src).getLen());
 
+    LOG.info("Renaming src = " + src + ", dst = " + dst);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Renaming src = " + src + ", dst = " + dst);
     }
     localFs.rename(src, dst);
-    // TODO NEWTEZ Event generation.
+    return null;
+  }
+  
+  @Override
+  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index 685722e..9c9eba0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -93,6 +93,10 @@ public class OnFileSortedOutput implements LogicalOutput {
     sorter.close();
     this.endTime = System.nanoTime();
 
+   return generateDataMovementEventsOnClose();
+  }
+  
+  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
     String host = System.getenv(ApplicationConstants.Environment.NM_HOST
         .toString());
     ByteBuffer shuffleMetadata = outputContext

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index bfd898b..29063f9 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -61,6 +61,7 @@ import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 @Private
@@ -75,6 +76,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final List<OutputSpec> outputSpecs;
   private final List<LogicalOutput> outputs;
 
+  private List<TezInputContext> inputContexts;
+  private List<TezOutputContext> outputContexts;
+  private TezProcessorContext processorContext;
+  
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
 
@@ -95,6 +100,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     super(taskSpec, tezConf, tezUmbilical);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
         + taskSpec);
+    this.inputContexts = new ArrayList<TezInputContext>(taskSpec.getInputs().size());
+    this.outputContexts = new ArrayList<TezOutputContext>(taskSpec.getOutputs().size());
     this.inputSpecs = taskSpec.getInputs();
     this.inputs = createInputs(inputSpecs);
     this.outputSpecs = taskSpec.getOutputs();
@@ -185,6 +192,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private void initializeInput(Input input, InputSpec inputSpec)
       throws Exception {
     TezInputContext tezInputContext = createInputContext(inputSpec);
+    inputContexts.add(tezInputContext);
     if (input instanceof LogicalInput) {
       ((LogicalInput) input).setNumPhysicalInputs(inputSpec
           .getPhysicalEdgeCount());
@@ -199,6 +207,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private void initializeOutput(Output output, OutputSpec outputSpec)
       throws Exception {
     TezOutputContext tezOutputContext = createOutputContext(outputSpec);
+    outputContexts.add(tezOutputContext);
     if (output instanceof LogicalOutput) {
       ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
           .getPhysicalEdgeCount());
@@ -215,6 +224,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     LOG.info("Initializing processor"
         + ", processorClassName=" + processorDescriptor.getClassName());
     TezProcessorContext processorContext = createProcessorContext();
+    this.processorContext = processorContext;
     processor.initialize(processorContext);
   }
 
@@ -425,5 +435,41 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       eventRouterThread.interrupt();
     }
   }
+  
+  @Private
+  @VisibleForTesting
+  public List<TezInputContext> getInputContexts() {
+    return this.inputContexts;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public List<TezOutputContext> getOutputContexts() {
+    return this.outputContexts;
+  }
 
+  @Private
+  @VisibleForTesting
+  public TezProcessorContext getProcessorContext() {
+    return this.processorContext;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public Map<String, LogicalInput> getInputs() {
+    return this.inputMap;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public Map<String, LogicalOutput> getOutputs() {
+    return this.outputMap;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public LogicalIOProcessor getProcessor() {
+    return this.processor;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 429d458..05675b5 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -80,12 +80,12 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 07fe58a..ec419c1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,12 +70,12 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index aca5b8e..7e662cb 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -68,6 +68,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.examples.MRRSleepJob;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
@@ -79,7 +80,6 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java
deleted file mode 100644
index 2d230d6..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
- * intermediate sorted data, merges them and provides key/<values> to the
- * consumer.
- * 
- * The Copy and Merge will be triggered by the initialization - which is handled
- * by the Tez framework. Input is not consumable until the Copy and Merge are
- * complete. Methods are provided to check for this, as well as to wait for
- * completion. Attempting to get a reader on a non-complete input will block.
- * 
- */
-
-package org.apache.tez.mapreduce.input;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-
-public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
-
-  @Private
-  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
-    // wait for input so that iterator is available
-    waitForInputReady();
-    return rawIter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 1a01466..f7404d4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
@@ -83,7 +82,6 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
 import org.apache.tez.mapreduce.output.SimpleOutput;
-import org.apache.tez.mapreduce.task.impl.YarnOutputFiles;
 
 @SuppressWarnings("deprecation")
 public abstract class MRTask {
@@ -204,9 +202,6 @@ public abstract class MRTask {
     // Containers.
     // Set it in conf, so as to be able to be used the the OutputCommitter.
 
-    jobConf.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
-        MapOutputFile.class); // MR
-
     // Not needed. This is probably being set via the source/consumer meta
     Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
     if (jobToken != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9210187..9274765 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -46,8 +46,8 @@ import org.apache.tez.engine.api.LogicalOutput;
 import org.apache.tez.engine.api.TezProcessorContext;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
 import org.apache.tez.mapreduce.output.SimpleOutput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
deleted file mode 100644
index e28e474..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.task.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.tez.common.Constants;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class YarnOutputFiles extends MapOutputFile {
-
-  private JobConf conf;
-
-  private static final String JOB_OUTPUT_DIR = "output";
-  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
-  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
-      + ".index";
-
-  public YarnOutputFiles() {
-  }
-
-  // assume configured to $localdir/usercache/$user/appcache/$appId
-  private LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(MRConfig.LOCAL_DIR);
-
-  private Path getAttemptOutputDir() {
-    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
-  }
-  
-  /**
-   * Return the path to local map output file created earlier
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFile() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output file name.
-   * 
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFileForWrite(long size) throws IOException {
-    Path attemptOutput = 
-      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
-  }
-
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir,
-        conf.get(JobContext.TASK_ATTEMPT_ID));
-    return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING);
-  }
-
-  /**
-   * Return the path to a local map output index file created earlier
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFile() throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
-          Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output index file name.
-   * 
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFileForWrite(long size) throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
-          Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
-        size, conf);
-  }
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir,
-        conf.get(JobContext.TASK_ATTEMPT_ID));
-    return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING +
-        Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
-  }
-
-  /**
-   * Return a local map spill file created earlier.
-   * 
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill file name.
-   * 
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(String.format(SPILL_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
-  }
-
-  /**
-   * Return a local map spill index file created earlier
-   * 
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill index file name.
-   * 
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
-  }
-
-  /**
-   * Return a local reduce input file created earlier
-   * 
-   * @param mapId a map task id
-   * @return path
-   * @throws IOException 
-   */
-  public Path getInputFile(int mapId) throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-
-  /**
-   * Create a local reduce input file name.
-   * 
-   * @param mapId a map task id
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
-      long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        Constants.REDUCE_INPUT_FILE_FORMAT_STRING,
-        getAttemptOutputDir().toString(), mapId.getId()),
-        size, conf);
-  }
-
-  /** Removes all of the files related to a task. */
-  public void removeAll() throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
new file mode 100644
index 0000000..9de2ed1
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+
+public class TestUmbilical implements TezUmbilical {
+
+  private static final Log LOG = LogFactory.getLog(TestUmbilical.class);
+
+  public TestUmbilical() {
+  }
+
+  @Override
+  public void addEvents(Collection<TezEvent> events) {
+    if (events != null && events.size() > 0) {
+      LOG.info("#Events Received: " + events.size());
+      for (TezEvent event : events) {
+        LOG.info("Event: " + event);
+      }
+    }
+  }
+
+  @Override
+  public void signalFatalError(TezTaskAttemptID taskAttemptID,
+      String diagnostics, EventMetaData sourceInfo) {
+    LOG.info("Received fatal error from task: " + taskAttemptID
+        + ", Diagnostics: " + diagnostics);
+
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Got canCommit from task: " + taskAttemptID);
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
deleted file mode 100644
index d5823f7..0000000
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.records.ProceedToCompletionResponse;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
-
-public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
-
-  private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class);
-  private ProceedToCompletionResponse proceedToCompletionResponse;
-
-
-  public TestUmbilicalProtocol() {
-    proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
-  }
-
-  public TestUmbilicalProtocol(boolean shouldLinger) {
-    if (shouldLinger) {
-      proceedToCompletionResponse = new ProceedToCompletionResponse(false, false);
-    } else {
-      proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
-    }
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
-      throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  @Override
-  public ContainerTask getTask(ContainerContext containerContext)
-      throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
-    LOG.info("Got 'can-commit' from " + taskid);
-    return true;
-  }
-
-  @Override
-  public ProceedToCompletionResponse proceedToCompletion(
-      TezTaskAttemptID taskAttemptId) throws IOException {
-    return proceedToCompletionResponse;
-  }
-
-  @Override
-  public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) {
-    // TODO Auto-generated method stub
-    // TODO TODONEWTEZ
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 85e6653..4b2c0e8 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -43,13 +43,16 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.engine.api.impl.InputSpec;
 import org.apache.tez.engine.api.impl.OutputSpec;
 import org.apache.tez.engine.api.impl.TaskSpec;
 import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -108,6 +111,7 @@ public class MapUtils {
       throws IOException {
     FileInputFormat.setInputPaths(job, workDir);
 
+    LOG.info("Generating data at path: " + file);
     // create a file with length entries
     @SuppressWarnings("deprecation")
     SequenceFile.Writer writer = 
@@ -147,6 +151,7 @@ public class MapUtils {
       InputSplit split) throws IOException {
     Path jobSplitFile = new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
         TezJobConfig.DEFAULT_TASK_LOCAL_RESOURCE_DIR), MRJobConfig.JOB_SPLIT);
+    LOG.info("Writing split to: " + jobSplitFile);
     FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
         new FsPermission(JOB_FILE_PERMISSION));
 
@@ -173,17 +178,23 @@ public class MapUtils {
     outMeta.close();
   }
 
-  public static LogicalIOProcessorRuntimeTask runMapProcessor(FileSystem fs, Path workDir,
+  public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput) throws IOException {
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+    writeSplitFiles(fs, jobConf, split);
+  }
+  
+  public static LogicalIOProcessorRuntimeTask createLogicalTask(FileSystem fs, Path workDir,
       JobConf jobConf, int mapId, Path mapInput,
       TezUmbilical umbilical,
       String vertexName, List<InputSpec> inputSpecs,
       List<OutputSpec> outputSpecs) throws Exception {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
-    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
 
     ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor(
-        MapProcessor.class.getName());
-    writeSplitFiles(fs, jobConf, split);
+        MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf));
+    
+    Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>();
 
     TaskSpec taskSpec = new TaskSpec(
         TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0),
@@ -192,16 +203,13 @@ public class MapUtils {
         mapProcessorDesc,
         inputSpecs,
         outputSpecs);
-    
-    // TODO NEWTEZ Fix umbilical access
+
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
         taskSpec,
-        1,
+        0,
         jobConf,
         umbilical,
-        null);
-    task.initialize();
-    task.run();
+        shuffleToken);
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 2ecce8b..06e2f4b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -30,23 +30,26 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.engine.api.TezInputContext;
 import org.apache.tez.engine.api.impl.InputSpec;
 import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TezUmbilical;
 import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.SimpleInputLegacy;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -67,6 +70,7 @@ public class TestMapProcessor {
       workDir =
           new Path(new Path(System.getProperty("test.build.data", "/tmp")),
                    "TestMapProcessor").makeQualified(localFs);
+      LOG.info("Using workDir: " + workDir);
       MapUtils.configureLocalDirs(defaultConf, workDir.toString());
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
@@ -79,10 +83,12 @@ public class TestMapProcessor {
 
   public void setUpJobConf(JobConf job) {
     job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
+    job.set(MRConfig.LOCAL_DIR, workDir.toString());
     job.setClass(
         Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
         TezTaskOutput.class);
+    job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
     job.setNumReduceTasks(1);
   }
 
@@ -97,7 +103,6 @@ public class TestMapProcessor {
     String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
     JobConf jobConf = new JobConf(defaultConf);
     setUpJobConf(jobConf);
-    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId");
 
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
@@ -110,15 +115,27 @@ public class TestMapProcessor {
     job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 1);
+    Path mapInput = new Path(workDir, "map0");
+    
+    
+    MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
+    
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
-    // TODO NEWTEZ FIXME TezUmbilical handling
-    LogicalIOProcessorRuntimeTask t = MapUtils.runMapProcessor(localFs, workDir, job, 0,
-        new Path(workDir, "map0"), (TezUmbilical) null, vertexName,
+    LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
+        new Path(workDir, "map0"), new TestUmbilical(), vertexName,
         Collections.singletonList(mapInputSpec),
         Collections.singletonList(mapOutputSpec));
-
+    
+    task.initialize();
+    task.run();
+    task.close();
+    
+    TezInputContext inputContext = task.getInputContexts().get(0);
+    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, inputContext.getUniqueIdentifier());
+    
+    
     // TODO NEWTEZ FIXME OutputCommitter verification
 //    MRTask mrTask = (MRTask)t.getProcessor();
 //    Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1d35f9b..a3abd76 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -31,20 +31,24 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.engine.api.impl.InputSpec;
 import org.apache.tez.engine.api.impl.OutputSpec;
 import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.input.LocalMergedInput;
 import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -52,8 +56,8 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.SimpleInputLegacy;
 import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -75,7 +79,7 @@ public class TestReduceProcessor {
       workDir =
           new Path(new Path(System.getProperty("test.build.data", "/tmp")),
                    "TestReduceProcessor").makeQualified(localFs);
-      
+      LOG.info("Using workDir: " + workDir);
       MapUtils.configureLocalDirs(defaultConf, workDir.toString());
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
@@ -84,10 +88,12 @@ public class TestReduceProcessor {
 
   public void setUpJobConf(JobConf job) {
     job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
+    job.set(MRConfig.LOCAL_DIR, workDir.toString());
     job.setClass(
         Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
         TezTaskOutput.class);
+    job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
     job.setNumReduceTasks(1);
   }
 
@@ -104,10 +110,10 @@ public class TestReduceProcessor {
     String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
     JobConf jobConf = new JobConf(defaultConf);
     setUpJobConf(jobConf);
-    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId");
     
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+    
     Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
         mapVertexName);
     
@@ -116,19 +122,24 @@ public class TestReduceProcessor {
     mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(
-        SimpleInputLegacy.class.getName()), 0);
-    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(
-        LocalOnFileSorterOutput.class.getName()), 1);
+    Path mapInput = new Path(workDir, "map0");
+    MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
+    
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
-    // TODO NEWTEZ FIX Umbilical creation
-    MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
-        new Path(workDir, "map0"), (TezUmbilical) null, mapVertexName,
+    LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
+        mapInput, new TestUmbilical(), mapVertexName,
         Collections.singletonList(mapInputSpec),
         Collections.singletonList(mapOutputSpec));
 
+    mapTask.initialize();
+    mapTask.run();
+    mapTask.close();
+    
     LOG.info("Starting reduce...");
     
+    Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>();
     
     Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
         reduceVertexName);
@@ -138,7 +149,7 @@ public class TestReduceProcessor {
         "localized-resources").toUri().toString());
     FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
     ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
-        ReduceProcessor.class.getName());
+        ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
     
     InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
     OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
@@ -151,28 +162,26 @@ public class TestReduceProcessor {
         reduceProcessorDesc,
         Collections.singletonList(reduceInputSpec),
         Collections.singletonList(reduceOutputSpec));
-    
-    // TODO NEWTEZ FIXME Umbilical and jobToken
+
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
         taskSpec,
-        1,
+        0,
         reduceConf,
-        (TezUmbilical) null,
-        null);
+        new TestUmbilical(),
+        shuffleToken);
     
     task.initialize();
     task.run();
-    
-//    MRTask mrTask = (MRTask)t.getProcessor();
-//    TODO NEWTEZ Verify the partitioner has been created
-//    Assert.assertNull(mrTask.getPartitioner());
     task.close();
     
-    // Can this be done via some utility class ? MapOutputFile derivative, or
-    // instantiating the OutputCommitter
-    
+    // MRTask mrTask = (MRTask)t.getProcessor();
+    // TODO NEWTEZ Verify the partitioner has not been created
+    // Likely not applicable anymore.
+    // Assert.assertNull(mrTask.getPartitioner());
+
+
 
-    // TODO NEWTEZ FIXME uniqueId generation and event generation (mockTaskId will not work here)
+    // Only a task commit happens, hence the data is still in the temporary directory.
     Path reduceOutputDir = new Path(new Path(workDir, "output"),
         "_temporary/0/" + IDConverter
             .toMRTaskId(TezTestUtils.getMockTaskId(0, 1, 0)));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 56f9035..6496b55 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -95,13 +95,13 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 


Mime
View raw message