tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [8/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ te...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.impl.VertexContext;
+import org.apache.tez.dag.api.impl.VertexOutputCommitter;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class MRVertexOutputCommitter extends VertexOutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(
+      MRVertexOutputCommitter.class);
+
+  private OutputCommitter committer;
+  private JobContext jobContext;
+  private volatile boolean initialized = false;
+
+  public MRVertexOutputCommitter() {
+  }
+
+  @SuppressWarnings("rawtypes")
+  private OutputCommitter getOutputCommitter(VertexContext context) {
+    Configuration conf = context.getConf();
+    OutputCommitter committer = null;
+    boolean newApiCommitter = false;
+    if (conf.getBoolean("mapred.reducer.new-api", false)
+        || conf.getBoolean("mapred.mapper.new-api", false))  {
+      newApiCommitter = true;
+      LOG.info("Using mapred newApiCommitter.");
+    }
+
+    LOG.info("OutputCommitter set in config "
+        + conf.get("mapred.output.committer.class"));
+
+    if (newApiCommitter) {
+      TezTaskID taskId = TezBuilderUtils.newTaskId(context.getDAGId(),
+          context.getVertexId().getId(), 0);
+      TezTaskAttemptID attemptID =
+          TezBuilderUtils.newTaskAttemptId(taskId, 0);
+      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+          TezMRTypeConverter.fromTez(attemptID));
+      try {
+        OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
+            .getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } catch (Exception e) {
+        throw new YarnException(e);
+      }
+    } else {
+      committer = ReflectionUtils.newInstance(conf.getClass(
+          "mapred.output.committer.class", FileOutputCommitter.class,
+          org.apache.hadoop.mapred.OutputCommitter.class), conf);
+    }
+    LOG.info("OutputCommitter is " + committer.getClass().getName());
+    return committer;
+  }
+
+  // FIXME we are using ApplicationId as DAG id
+  private JobContext getJobContextFromVertexContext(VertexContext context) {
+    // FIXME when we have the vertex level user-land configuration
+    // jobConf should be initialized using the user-land level configuration
+    // for the vertex in question
+    JobConf jobConf = new JobConf(context.getConf());
+    JobID jobId = TypeConverter.fromYarn(context.getDAGId().getApplicationId());
+    jobConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+    return new MRJobContextImpl(jobConf, jobId);
+  }
+
+  private State getJobStateFromVertexStatusState(VertexStatus.State state) {
+    return JobStatus.State.valueOf(state.name());
+  }
+
+  @Override
+  public void init(VertexContext context) throws IOException {
+    committer = getOutputCommitter(context);
+    jobContext = getJobContextFromVertexContext(context);
+    initialized = true;
+  }
+
+  @Override
+  public void setupVertex() throws IOException {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    committer.setupJob(jobContext);
+  }
+
+  @Override
+  public void commitVertex() throws IOException {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public void abortVertex(VertexStatus.State finalState) throws IOException {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    State jobState = getJobStateFromVertexStatusState(finalState);
+    committer.abortJob(jobContext, jobState);
+  }
+
+  private static class MRJobContextImpl
+      extends org.apache.hadoop.mapred.JobContextImpl {
+
+    public MRJobContextImpl(JobConf jobConf, JobID jobId) {
+      super(jobConf, jobId);
+    }
+
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,100 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class MapReduceChildJVM2 {
+
+  private static String getTaskLogFile(LogName filter) {
+    return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + 
+        filter.toString();
+  }
+  
+  public static void setVMEnv(Map<String, String> environment, JobConf conf,
+      TezVertexID vertexId) {
+
+    // FIXME this should be derivable from the container id set by the NM
+    // and not require the AM to set
+    environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
+        conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
+
+  }
+
+  public static List<String> getVMCommand(
+      InetSocketAddress taskAttemptListenerAddr, JobConf conf, 
+      TezVertexID vertexId, 
+      ContainerId containerId, ApplicationId jobID, boolean shouldProfile) {
+
+    Vector<String> vargs = new Vector<String>(9);
+
+    vargs.add("exec");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    // Add child (task) java-vm options.
+    // FIXME add support for child java opts
+
+    Path childTmpDir = new Path(Environment.PWD.$(),
+        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
+    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+    // FIXME Setup the log4j properties
+
+    // Decision to profile needs to be made in the scheduler.
+    if (shouldProfile) {
+      // FIXME add support for profiling
+    }
+
+    // Add main class and its arguments 
+    vargs.add(YarnTezDagChild.class.getName());  // main of Child
+    // pass TaskAttemptListener's address
+    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
+    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
+    // Set the job id
+    vargs.add(jobID.toString());
+
+    // Finally add the containerId.
+    vargs.add(String.valueOf(containerId.toString()));
+    vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
+    vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));
+
+    // Final commmand
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    Vector<String> vargsFinal = new Vector<String>(1);
+    vargsFinal.add(mergedCommand.toString());
+    return vargsFinal;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class TezMRTypeConverter {
+
+  // FIXME hack alert assumimg only one dag per application
+  public static TaskAttemptID fromTez(TezTaskAttemptID attemptId) {
+    TezTaskID taskId = attemptId.getTaskID();
+    TezVertexID vertexId = taskId.getVertexID();
+    TezDAGID dagId = vertexId.getDAGId();
+    return new TaskAttemptID(
+        Long.toString(dagId.getApplicationId().getClusterTimestamp()),
+        dagId.getApplicationId().getId(),
+        (vertexId.getId() == 0 ? TaskType.MAP : TaskType.REDUCE),
+        taskId.getId(), attemptId.getId());
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A simple wrapper for increasing the visibility.
+ */
+public class WrappedJvmID extends JVMId {
+
+  public WrappedJvmID(JobID jobID, boolean mapTask, int nextInt) {
+    super(jobID, mapTask, nextInt);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+//Workaround for PeriodicStateAccumulator being package access
+public class WrappedPeriodicStatsAccumulator {
+
+  private PeriodicStatsAccumulator real;
+
+  public WrappedPeriodicStatsAccumulator(PeriodicStatsAccumulator real) {
+    this.real = real;
+  }
+  
+  public void extend(double newProgress, int newValue) {
+    real.extend(newProgress, newValue);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+// Workaround for ProgressSplitBlock being package access
+public class WrappedProgressSplitsBlock extends ProgressSplitsBlock {
+  private WrappedPeriodicStatsAccumulator wrappedProgressWallclockTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressCPUTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressVirtualMemoryKbytes;
+  private WrappedPeriodicStatsAccumulator wrappedProgressPhysicalMemoryKbytes;
+
+  public WrappedProgressSplitsBlock(int numberSplits) {
+    super(numberSplits);
+  }
+
+  public int[][] burst() {
+    return super.burst();
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressWallclockTime() {
+    if (wrappedProgressWallclockTime == null) {
+      wrappedProgressWallclockTime = new WrappedPeriodicStatsAccumulator(
+          progressWallclockTime);
+    }
+    return wrappedProgressWallclockTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressCPUTime() {
+    if (wrappedProgressCPUTime == null) {
+      wrappedProgressCPUTime = new WrappedPeriodicStatsAccumulator(
+          progressCPUTime);
+    }
+    return wrappedProgressCPUTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressVirtualMemoryKbytes() {
+    if (wrappedProgressVirtualMemoryKbytes == null) {
+      wrappedProgressVirtualMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressVirtualMemoryKbytes);
+    }
+    return wrappedProgressVirtualMemoryKbytes;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressPhysicalMemoryKbytes() {
+    if (wrappedProgressPhysicalMemoryKbytes == null) {
+      wrappedProgressPhysicalMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressPhysicalMemoryKbytes);
+    }
+    return wrappedProgressPhysicalMemoryKbytes;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,236 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+  private JobConf conf;
+
+  private static final String JOB_OUTPUT_DIR = "output";
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public YarnOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,481 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.log4j.LogManager;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.processor.MRTask;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/**
+ * The main() for TEZ MapReduce task processes.
+ */
+public class YarnTezDagChild {
+
+  private static final Log LOG = LogFactory.getLog(YarnTezDagChild.class);
+
+  public static void main(String[] args) throws Throwable {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    LOG.debug("Child starting");
+
+    DeprecatedKeys.init();
+    
+    final JobConf defaultConf = new JobConf();
+    // HACK Eventually load the DagConf for security etc setup.
+//    defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+    UserGroupInformation.setConfiguration(defaultConf);
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final InetSocketAddress address =
+        NetUtils.createSocketAddrForHost(host, port);
+
+    final ApplicationId appID = ConverterUtils.toApplicationId(args[2]);
+    
+    final ContainerId containerId = ConverterUtils.toContainerId(args[3]);
+
+    // FIXME fix initialize metrics in child runner
+    DefaultMetricsSystem.initialize("VertexTask");
+
+    // Security framework already loaded the tokens into current ugi
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    LOG.info("Executing with tokens:");
+    for (Token<?> token: credentials.getAllTokens()) {
+      LOG.info(token);
+    }
+
+    // Create TaskUmbilicalProtocol as actual task owner.
+    UserGroupInformation taskOwner =
+      UserGroupInformation.createRemoteUser(appID.toString());
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+    SecurityUtil.setTokenService(jt, address);
+    taskOwner.addToken(jt);
+    final TezTaskUmbilicalProtocol umbilical =
+      taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+      @Override
+      public TezTaskUmbilicalProtocol run() throws Exception {
+        return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class,
+            TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+      }
+    });
+
+    // report non-pid to application master
+    String pid = System.getenv().get("JVM_PID");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PID, containerId: " + pid + ", " + containerId);
+    }
+    TezEngineTask taskContext = null;
+    ContainerTask containerTask = null;
+    UserGroupInformation childUGI = null;
+    TezTaskAttemptID taskAttemptId = null;
+    MRTask task = null;
+    ContainerContext containerContext = new ContainerContext(containerId, pid);
+    
+    try {
+      while (true) {
+        // poll for new task
+        for (int idle = 0; null == containerTask; ++idle) {
+          long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
+          LOG.info("Sleeping for " + sleepTimeMilliSecs
+              + "ms before retrying again. Got null now.");
+          MILLISECONDS.sleep(sleepTimeMilliSecs);
+          containerTask = umbilical.getTask(containerContext);
+        }
+        LOG.info("TaskInfo: shouldDie: "
+            + containerTask.shouldDie()
+            + (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
+                + containerTask.getTezEngineTaskContext().getTaskAttemptId()));
+
+        if (containerTask.shouldDie()) {
+          return;
+        }
+        taskContext = containerTask.getTezEngineTaskContext();
+
+        taskAttemptId = taskContext.getTaskAttemptId();
+
+        final Task t = createAndConfigureTezTask(taskContext, umbilical,
+            credentials, jt);
+        task = (MRTask) t.getProcessor();
+        final JobConf job = task.getConf();
+
+        // Initiate Java VM metrics
+        JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
+        childUGI = UserGroupInformation.createRemoteUser(System
+            .getenv(ApplicationConstants.Environment.USER.toString()));
+        // Add tokens to new user so that it may execute its task correctly.
+        childUGI.addCredentials(credentials);
+
+        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            runTezTask(t, umbilical, job); // run the task
+            return null;
+          }
+        });
+        FileSystem.closeAllForUGI(childUGI);
+        containerTask = null;
+      }
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskAttemptId, e.getMessage());
+    } catch (Exception exception) {
+      LOG.warn("Exception running child : "
+          + StringUtils.stringifyException(exception));
+      try {
+        if (task != null) {
+          // do cleanup for the task
+          if (childUGI == null) { // no need to job into doAs block
+            task.taskCleanup(umbilical);
+          } else {
+            final MRTask taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
+        }
+      } catch (Exception e) {
+        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+      }
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      exception.printStackTrace(new PrintStream(baos));
+      if (taskAttemptId != null) {
+        umbilical.fatalError(taskAttemptId, baos.toString());
+      }
+    } catch (Throwable throwable) {
+      LOG.fatal("Error running child : "
+    	        + StringUtils.stringifyException(throwable));
+      if (taskAttemptId != null) {
+        Throwable tCause = throwable.getCause();
+        String cause = tCause == null
+                                 ? throwable.getMessage()
+                                 : StringUtils.stringifyException(tCause);
+        umbilical.fatalError(taskAttemptId, cause);
+      }
+    } finally {
+      RPC.stopProxy(umbilical);
+      DefaultMetricsSystem.shutdown();
+      // Shutting down log4j of the child-vm...
+      // This assumes that on return from Task.run()
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+
+  /**
+   * Configure mapred-local dirs. This config is used by the task for finding
+   * out an output directory.
+   * @throws IOException 
+   */
+  private static void configureLocalDirs(MRTask task, JobConf job) throws IOException {
+    String[] localSysDirs = StringUtils.getTrimmedStrings(
+        System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+    job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
+    LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
+        job.get(TezJobConfig.LOCAL_DIR));
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+    Path workDir = null;
+    // First, try to find the JOB_LOCAL_DIR on this host.
+    try {
+      workDir = lDirAlloc.getLocalPathToRead("work", job);
+    } catch (DiskErrorException e) {
+      // DiskErrorException means dir not found. If not found, it will
+      // be created below.
+    }
+    if (workDir == null) {
+      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+      workDir = lDirAlloc.getLocalPathForWrite("work", job);
+      FileSystem lfs = FileSystem.getLocal(job).getRaw();
+      boolean madeDir = false;
+      try {
+        madeDir = lfs.mkdirs(workDir);
+      } catch (FileAlreadyExistsException e) {
+        // Since all tasks will be running in their own JVM, the race condition
+        // exists where multiple tasks could be trying to create this directory
+        // at the same time. If this task loses the race, it's okay because
+        // the directory already exists.
+        madeDir = true;
+        workDir = lDirAlloc.getLocalPathToRead("work", job);
+      }
+      if (!madeDir) {
+          throw new IOException("Mkdirs failed to create "
+              + workDir.toString());
+      }
+    }
+    // TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc.
+    job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+  }
+
+  private static JobConf configureTask(MRTask task, Credentials credentials,
+      Token<JobTokenIdentifier> jt) throws IOException, InterruptedException {
+    JobConf job = task.getConf();
+    
+    String appAttemptIdEnv = System
+        .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+    LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+    // Set it in conf, so as to be able to be used the the OutputCommitter.
+    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+        .parseInt(appAttemptIdEnv));
+
+    // set tcp nodelay
+    job.setBoolean("ipc.client.tcpnodelay", true);
+    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+        YarnOutputFiles.class, MapOutputFile.class);
+    // set the jobTokenFile into task
+    SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+
+    task.setJobTokenSecret(sk);
+//    task.setJobTokenSecret(
+//        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+    // setup the child's MRConfig.LOCAL_DIR.
+    configureLocalDirs(task, job);
+
+    // setup the child's attempt directories
+    // Do the task-type specific localization
+    task.localizeConfiguration(job);
+
+    // Set up the DistributedCache related configs
+    setupDistributedCacheConfig(job);
+
+    // Overwrite the localized task jobconf which is linked to in the current
+    // work-dir.
+    Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
+    writeLocalJobFile(localTaskFile, job);
+    task.setConf(job);
+    return job;
+  }
+
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * working.
+   * @param job
+   * @throws IOException
+   */
+  private static void setupDistributedCacheConfig(final JobConf job)
+      throws IOException {
+
+    String localWorkDir = System.getenv("PWD");
+    //        ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALFILES,
+            StringUtils.arrayToString(localFiles
+                .toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  private static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+  private static Task createAndConfigureTezTask(
+      TezEngineTask taskContext,
+      TezTaskUmbilicalProtocol master, 
+      Credentials credentials, Token<JobTokenIdentifier> jt) 
+      throws IOException, InterruptedException {
+    Configuration jConf = new JobConf(MRJobConfig.JOB_CONF_FILE);
+    Configuration conf;
+    
+    // TODO Post MRR. This structure will not allow randomly named vertices.
+    // Have the MRR client convert intermediate stage configuration to be based
+    // on vertex name.
+    // A single file per vertex will likely be a better solution. Does not
+    // require translation - client can take care of this. Will work independent
+    // of whether the configuration is for intermediate tasks or not. Has the
+    // overhead of localizing multiple files per job - i.e. the client would
+    // need to write these files to hdfs, add them as local resources per
+    // vertex. A solution like this may be more practical once it's possible to
+    // submit configuration parameters to the AM and effectively tasks via RPC.
+    LOG.info("DEBUG: VertexName: " + taskContext.getVertexName());
+    if (MultiStageMRConfigUtil.isIntermediateReduceStage(taskContext
+        .getVertexName())) {
+      LOG.info("DEBUG: is intermediate stage");
+      int intermediateStageNum = MultiStageMRConfigUtil
+          .getIntermediateReduceStageNum(taskContext.getVertexName());
+      LOG.info("DEBUG: intermediateStageNum: " + intermediateStageNum);
+      conf = MultiStageMRConfigUtil.getIntermediateStageConf(jConf,
+          intermediateStageNum);
+      MultiStageMRConfigUtil.printConf(conf);
+    } else {
+      conf = jConf;
+    }
+
+    // TODO Avoid all this extra config manipulation.
+    final JobConf job = new JobConf(conf);
+    job.setCredentials(credentials);
+    
+    // Create the appropriate guice task-module
+    AbstractModule taskModule = null;
+    LOG.info("Using Module: " + taskContext.getTaskModuleClassName());
+    try {
+      Class<?> moduleClazz = Class
+          .forName(taskContext.getTaskModuleClassName());
+      if (AbstractModule.class.isAssignableFrom(moduleClazz)) {
+        taskModule = (AbstractModule) ReflectionUtils.newInstance(moduleClazz,
+            job);
+      } else {
+        throw new YarnException("Module class: " + moduleClazz.getName()
+            + " should be an instance of "
+            + AbstractModule.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnException("Unable to load moduleClass: "
+          + taskContext.getTaskModuleClassName(), e);
+    }
+
+    // Use the injector to create & bind input, processor, output & task
+    Injector injector = Guice.createInjector(taskModule);
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, master);
+    
+    MRTask task = (MRTask)t.getProcessor();
+    configureTask(task, credentials, jt);
+    
+    return t;
+  }
+  
+  private static void runTezTask(
+      Task t, TezTaskUmbilicalProtocol master, JobConf job) 
+  throws IOException, InterruptedException {
+    // use job-specified working directory
+    FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+    
+    // Run!
+    t.run();
+    t.close();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/client/impl/TezBuilderUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/client/impl/TezBuilderUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/client/impl/TezBuilderUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/client/impl/TezBuilderUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.impl;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.records.AMInfo;
+import org.apache.tez.dag.app.dag.DAGReport;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class TezBuilderUtils {
+
+  public static TezVertexID newVertexID(TezDAGID dagId, int vertexId) {
+    return new TezVertexID(dagId, vertexId);
+  }
+
+  public static TezTaskAttemptID newTaskAttemptId(TezTaskID taskId, int id) {
+    return new TezTaskAttemptID(taskId, id);
+  }
+  
+  public static DAGReport newDAGReport() {
+    return null;
+  }
+
+  public static AMInfo newAMInfo(ApplicationAttemptId appAttemptID, 
+      long startTime, ContainerId containerID, String nmHost, 
+      int nmPort, int nmHttpPort) {
+    return null;
+  }
+
+  public static TezTaskID newTaskId(TezDAGID dagId, int vertexId, int taskId) {
+    return new TezTaskID(newVertexID(dagId, vertexId), taskId);
+  }
+
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/client/impl/TezBuilderUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/NullVertexOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/NullVertexOutputCommitter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/NullVertexOutputCommitter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/NullVertexOutputCommitter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.impl;
+
+import java.io.IOException;
+
+import org.apache.tez.dag.api.client.VertexStatus.State;
+
+public class NullVertexOutputCommitter extends VertexOutputCommitter {
+
+  @Override
+  public void init(VertexContext context) throws IOException {
+    // Nothing to do
+  }
+
+  @Override
+  public void setupVertex() throws IOException {
+    // Nothing to do
+  }
+
+  @Override
+  public void commitVertex() throws IOException {
+    // Nothing to do
+  }
+
+  @Override
+  public void abortVertex(State finalState) throws IOException {
+    // Nothing to do
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/NullVertexOutputCommitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public interface VertexContext {
+
+  public Configuration getConf();
+
+  public TezDAGID getDAGId();
+
+  public TezVertexID getVertexId();
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexOutputCommitter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexOutputCommitter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexOutputCommitter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+@Public
+@Unstable
+public abstract class VertexOutputCommitter {
+
+  /**
+   * Setup up the vertex output committer.
+   *
+   * @param context Context of the vertex whose output is being written.
+   * @throws IOException
+   */
+  public abstract void init(VertexContext context) throws IOException;
+
+  /**
+   * For the framework to setup the vertex output during initialization. This is
+   * called from the application master process for the vertex. This will be
+   * called multiple times, once per dag attempt for each vertex.
+   *
+   * @throws IOException if setup fails
+   */
+  public abstract void setupVertex() throws IOException;
+
+  /**
+   * For committing vertex's output after successful vertex completion.
+   * Note that this is invoked for vertices with a successful final state.
+   * This is called from the application master process for the entire vertex.
+   * This is guaranteed to only be called once.
+   * If it throws an exception the entire vertex will fail.
+   *
+   * @throws IOException
+   */
+  public abstract void commitVertex() throws IOException;
+
+  /**
+   * For aborting an unsuccessful vertex's output. Note that this is invoked for
+   * vertices with a final failed state. This is called from the application
+   * master process for the entire vertex. This may be called multiple times.
+   *
+   * @param state final runstate of the vertex
+   * @throws IOException
+   */
+  public abstract void abortVertex(VertexStatus.State finalState)
+      throws IOException;
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexOutputCommitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/AMInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/AMInfo.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/AMInfo.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/AMInfo.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.records;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public interface AMInfo {
+  public ApplicationAttemptId getAppAttemptId();
+  public long getStartTime();
+  public ContainerId getContainerId();
+  public String getNodeManagerHost();
+  public int getNodeManagerPort();
+  public int getNodeManagerHttpPort();
+
+  public void setAppAttemptId(ApplicationAttemptId appAttemptId);
+  public void setStartTime(long startTime);
+  public void setContainerId(ContainerId containerId);
+  public void setNodeManagerHost(String nmHost);
+  public void setNodeManagerPort(int nmPort);
+  public void setNodeManagerHttpPort(int mnHttpPort);
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/AMInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptReport.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptReport.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptReport.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptReport.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.records;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public interface TaskAttemptReport {
+  public abstract TezTaskAttemptID getTaskAttemptId();
+  public abstract TaskAttemptState getTaskAttemptState();
+  public abstract float getProgress();
+  public abstract long getStartTime();
+  public abstract long getFinishTime();
+  /** @return the shuffle finish time. Applicable only for reduce attempts */
+  public abstract long getShuffleFinishTime();
+  /** @return the sort/merge finish time. Applicable only for reduce attempts */
+  public abstract long getSortFinishTime();
+  public abstract TezCounters getCounters();
+  public abstract String getDiagnosticInfo();
+  public abstract String getStateString();
+  public abstract String getNodeManagerHost();
+  public abstract int getNodeManagerPort();
+  public abstract int getNodeManagerHttpPort();
+  public abstract ContainerId getContainerId();
+
+  public abstract void setTaskAttemptId(TezTaskAttemptID taskAttemptId);
+  public abstract void setTaskAttemptState(TaskAttemptState taskAttemptState);
+  public abstract void setProgress(float progress);
+  public abstract void setStartTime(long startTime);
+  public abstract void setFinishTime(long finishTime);
+  public abstract void setCounters(TezCounters counters);
+  public abstract void setDiagnosticInfo(String diagnosticInfo);
+  public abstract void setStateString(String stateString);
+  public abstract void setNodeManagerHost(String nmHost);
+  public abstract void setNodeManagerPort(int nmPort);
+  public abstract void setNodeManagerHttpPort(int nmHttpPort);
+  public abstract void setContainerId(ContainerId containerId);
+  
+  /** 
+   * Set the shuffle finish time. Applicable only for reduce attempts
+   * @param time the time the shuffle finished.
+   */
+  public abstract void setShuffleFinishTime(long time);
+  /** 
+   * Set the sort/merge finish time. Applicable only for reduce attempts
+   * @param time the time the shuffle finished.
+   */
+  public abstract void setSortFinishTime(long time);
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptReport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.records;
+
+public enum TaskAttemptState {
+  NEW, 
+  STARTING, 
+  RUNNING, 
+  COMMIT_PENDING,  
+  SUCCEEDED,
+  FAILED,
+  KILLED
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskAttemptState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskReport.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskReport.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskReport.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskReport.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.records;
+
+import java.util.List;
+
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+public interface TaskReport {
+  public abstract TezTaskID getTaskId();
+  public abstract TaskState getTaskState();
+  public abstract float getProgress();
+  public abstract long getStartTime();
+  public abstract long getFinishTime();
+  public abstract TezCounters getCounters();
+  
+  public abstract List<TezTaskAttemptID> getRunningAttemptsList();
+  public abstract TezTaskAttemptID getRunningAttempt(int index);
+  public abstract int getRunningAttemptsCount();
+  
+  public abstract TezTaskAttemptID getSuccessfulAttempt();
+  
+  public abstract List<String> getDiagnosticsList();
+  public abstract String getDiagnostics(int index);
+  public abstract int getDiagnosticsCount();
+  
+  
+  public abstract void setTaskId(TezTaskID taskId);
+  public abstract void setTaskState(TaskState taskState);
+  public abstract void setProgress(float progress);
+  public abstract void setStartTime(long startTime);
+  public abstract void setFinishTime(long finishTime);
+  public abstract void setCounters(TezCounters counters);
+  
+  public abstract void addAllRunningAttempts(List<TezTaskAttemptID> taskAttempts);
+  public abstract void addRunningAttempt(TezTaskAttemptID taskAttempt);
+  public abstract void removeRunningAttempt(int index);
+  public abstract void clearRunningAttempts();
+  
+  public abstract void setSuccessfulAttempt(TezTaskAttemptID taskAttempt);
+  public abstract void addAllDiagnostics(List<String> diagnostics);
+  public abstract void addDiagnostics(String diagnostics);
+  public abstract void removeDiagnostics(int index);
+  public abstract void clearDiagnostics();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskReport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.records;
+
+public enum TaskState {
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILLED
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/records/TaskState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.node.AMNodeMap;
+import org.apache.tez.engine.records.TezDAGID;
+
+
+/**
+ * Context interface for sharing information across components in Tez DAG
+ */
+@InterfaceAudience.Private
+public interface AppContext {
+
+  ApplicationId getApplicationID();
+
+  TezDAGID getDAGID();
+
+  ApplicationAttemptId getApplicationAttemptId();
+
+  String getApplicationName();
+  
+  Map<ApplicationAccessType, String> getApplicationACLs();
+
+  long getStartTime();
+
+  CharSequence getUser();
+
+  DAG getDAG();
+
+  void setDAG(DAG dag);
+
+  @SuppressWarnings("rawtypes")
+  EventHandler getEventHandler();
+
+  Clock getClock();
+  
+  ClusterInfo getClusterInfo();
+  
+  AMContainerMap getAllContainers();
+  
+  AMNodeMap getAllNodes();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ContainerHeartbeatHandler extends
+    HeartbeatHandlerBase<ContainerId> {
+
+ 
+  public ContainerHeartbeatHandler(AppContext context, 
+      int numThreads) {
+    super(context, numThreads, "ContainerHeartbeatHandler");
+  }
+
+  @Override
+  protected int getConfiguredTimeout(Configuration conf) {
+    // TODO Maybe define separate timeouts for Containers and tasks.
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+  }
+
+  @Override
+  protected int getConfiguredTimeoutCheckInterval(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+  }
+
+  @Override
+  protected boolean hasTimedOut(ReportTime report, long currentTime) {
+    return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void handleTimeOut(ContainerId containerId) {
+    eventHandler.handle(new AMContainerEvent(containerId,
+        AMContainerEventType.C_TIMED_OUT));
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ControlledClock.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ControlledClock.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ControlledClock.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ControlledClock.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.yarn.Clock;
+
+public class ControlledClock implements Clock {
+  private long time = -1;
+  private final Clock actualClock;
+  public ControlledClock(Clock actualClock) {
+    this.actualClock = actualClock;
+  }
+  public synchronized void setTime(long time) {
+    this.time = time;
+  }
+  public synchronized void reset() {
+    time = -1;
+  }
+
+  @Override
+  public synchronized long getTime() {
+    if (time != -1) {
+      return time;
+    }
+    return actualClock.getTime();
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/ControlledClock.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message