tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/2] TEZ-341. Allow DAG to be submitted to an AM after it has started. (bikas and hitesh via hitesh)
Date Wed, 21 Aug 2013 02:12:29 GMT
Updated Branches:
  refs/heads/master a89057105 -> 9640b6f40


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
index dc1e45c..4dea478 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
@@ -107,11 +107,11 @@ public class DefaultSpeculator extends AbstractService implements
   public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
     this(conf, context, getEstimator(conf, context), clock);
   }
-  
+
   static private TaskRuntimeEstimator getEstimator
       (Configuration conf, AppContext context) {
     TaskRuntimeEstimator estimator;
-    
+
     try {
       // "yarn.mapreduce.job.task.runtime.estimator.class"
       Class<? extends TaskRuntimeEstimator> estimatorClass
@@ -138,7 +138,7 @@ public class DefaultSpeculator extends AbstractService implements
       LOG.error("Can't make a speculation runtime extimator", ex);
       throw new TezUncheckedException(ex);
     }
-    
+
   return estimator;
   }
 
@@ -269,7 +269,7 @@ public class DefaultSpeculator extends AbstractService implements
             (event.getReportedStatus(), event.getTimestamp());
         break;
       }
-      
+
       case JOB_CREATE:
       {
         LOG.info("JOB_CREATE " + event.getJobID());
@@ -293,7 +293,7 @@ public class DefaultSpeculator extends AbstractService implements
 
     TezTaskAttemptID attemptID = reportedStatus.id;
     TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
 
     if (job == null) {
       return;
@@ -338,7 +338,7 @@ public class DefaultSpeculator extends AbstractService implements
   // All of these values are negative.  Any value that should be allowed to
   //  speculate is 0 or positive.
   private long speculationValue(TezTaskID taskID, long now) {
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
     Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
     Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
     long acceptableRuntime = Long.MIN_VALUE;
@@ -452,9 +452,9 @@ public class DefaultSpeculator extends AbstractService implements
       int numberRunningTasks = 0;
 
       // loop through the tasks of the kind
-      DAG job = context.getDAG();
+      DAG job = context.getCurrentDAG();
 
-      Map<TezTaskID, Task> tasks = 
+      Map<TezTaskID, Task> tasks =
           job.getVertex(TezBuilderUtils.newVertexID(job.getID(), vertexId)).getTasks();
 
       int numberAllowedSpeculativeTasks

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
index 34bb4e3..b7ebc68 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
@@ -41,11 +41,11 @@ public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
   @Override
   public void updateAttempt(TaskAttemptStatus status, long timestamp) {
     super.updateAttempt(status, timestamp);
-    
+
 
     TezTaskAttemptID attemptID = status.id;
     TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
 
     if (job == null) {
       return;
@@ -113,7 +113,7 @@ public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
   private long storedPerAttemptValue
        (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
     TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
 
     Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
index ba5bfda..8d36f28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
@@ -84,7 +84,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator
{
     this.context = context;
 
 
-    final DAG dag = context.getDAG();
+    final DAG dag = context.getCurrentDAG();
     for (Entry<TezVertexID, Vertex> entry: dag.getVertices().entrySet()) {
       vertexStatistics.put(entry.getKey(), new DataStatistics());
       slowTaskRelativeTresholds =
@@ -93,7 +93,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator
{
   }
 
   protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
-    DAG dag = context.getDAG();
+    DAG dag = context.getCurrentDAG();
 
     if (dag == null) {
       return null;
@@ -110,14 +110,14 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator
{
 
   @Override
   public long thresholdRuntime(TezTaskID taskID) {
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
 
     DataStatistics statistics = dataStatisticsForTask(taskID);
 
     Vertex v = job.getVertex(taskID.getVertexID());
     int completedTasksOfType = v.getCompletedTasks();
     int totalTasksOfType = v.getTotalTasks();
-    
+
     if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
         || (((float)completedTasksOfType) / totalTasksOfType)
               < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
@@ -146,7 +146,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator
{
 
     TezTaskAttemptID attemptID = status.id;
     TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
 
     if (job == null) {
       return;
@@ -160,7 +160,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator
{
 
     Long boxedStart = startTimes.get(attemptID);
     long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
-    
+
     TaskAttempt taskAttempt = task.getAttempt(attemptID);
 
     if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
index 33ea09f..ae2c612 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
@@ -31,7 +31,7 @@ public class TaskSpeculationPredicate {
     //  Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
     //  can be even more restrictive.
     // TODO handle multiple dags
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
     Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
     return task.getAttempts().size() == 1;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index f868650..3543259 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -49,10 +49,10 @@ implements EventHandler<DAGHistoryEvent> {
 
   @Override
   public void handle(DAGHistoryEvent event) {
-    TezDAGID dagId = context.getDAGID();
+    TezDAGID dagId = context.getCurrentDAGID();
     String dagIdStr = "N/A";
     if(dagId != null) {
-      dagIdStr = context.getDAGID().toString();
+      dagIdStr = context.getCurrentDAGID().toString();
     }
     LOG.info("[HISTORY]"
         + "[DAG:" + dagIdStr + "]"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index efe37bf..acf40b1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -488,11 +488,11 @@ public class TestDAGImpl {
     jobTokenSecretManager = new JobTokenSecretManager();
     appContext = mock(AppContext.class);
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    doReturn(dagId).when(appContext).getDAGID();
+    doReturn(dagId).when(appContext).getCurrentDAGID();
     dag = new DAGImpl(dagId, conf, dagPlan,
         dispatcher.getEventHandler(),  taskAttemptListener,
         jobTokenSecretManager, fsTokens, clock, "user", thh, appContext);
-    doReturn(dag).when(appContext).getDAG();
+    doReturn(dag).when(appContext).getCurrentDAG();
     mrrAppContext = mock(AppContext.class);
     mrrDagId = new TezDAGID(appAttemptId.getApplicationId(), 2);
     mrrDagPlan = createTestMRRDAGPlan();
@@ -500,7 +500,7 @@ public class TestDAGImpl {
         dispatcher.getEventHandler(),  taskAttemptListener,
         jobTokenSecretManager, fsTokens, clock, "user", thh,
         mrrAppContext);
-    doReturn(mrrDag).when(mrrAppContext).getDAG();
+    doReturn(mrrDag).when(mrrAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index f2a7be7..6022a6d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -518,9 +518,9 @@ public class TestVertexImpl {
     appContext = mock(AppContext.class);
     DAG dag = mock(DAG.class);
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    doReturn(dag).when(appContext).getDAG();
+    doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(DAGPlan.getDefaultInstance()).when(dag).getJobPlan();
-    doReturn(dagId).when(appContext).getDAGID();
+    doReturn(dagId).when(appContext).getCurrentDAGID();
     doReturn(dagId).when(dag).getID();
     setupVertices();
     edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 9b6998b..922561d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -91,7 +91,7 @@ public class TestContainerReuse {
     AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeMap).when(appContext).getAllNodes();
-    doReturn(dagID).when(appContext).getDAGID();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext,
eventHandler, rmClient);
@@ -190,7 +190,7 @@ public class TestContainerReuse {
     AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeMap).when(appContext).getAllNodes();
-    doReturn(dagID).when(appContext).getDAGID();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext,
eventHandler, rmClient);
@@ -284,7 +284,7 @@ public class TestContainerReuse {
     AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeMap).when(appContext).getAllNodes();
-    doReturn(dagID).when(appContext).getDAGID();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext,
eventHandler, rmClient);
@@ -410,4 +410,4 @@ public class TestContainerReuse {
   }
 
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index df489ae..b573a8f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -743,7 +744,7 @@ public class MRRSleepJob extends Configured implements Tool {
             TezConfiguration.TEZ_AM_STAGING_DIR,
             TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
             appId.toString()));
-    tezClient.ensureExists(remoteStagingDir);
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
 
     DAG dag = createDAG(remoteFs, conf, appId, remoteStagingDir,
         numMapper, numReducer, iReduceStagesCount, numIReducer,
@@ -755,7 +756,7 @@ public class MRRSleepJob extends Configured implements Tool {
 
     DAGClient dagClient =
         tezClient.submitDAGApplication(appId, dag, remoteStagingDir,
-            null, null, amArgs , null, null, conf);
+            null, null, dag.getName(), amArgs , null, null, conf);
 
     while (true) {
       DAGStatus status = dagClient.getDAGStatus();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 3edb73a..f1e39be 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -19,30 +19,62 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 
 /**
  * An MRR job built on top of word count to return words sorted by
@@ -108,75 +140,222 @@ public class OrderedWordCount {
       System.err.println("Usage: wordcount <in> <out>");
       System.exit(2);
     }
+    String inputPath = otherArgs[0];
+    String outputPath = otherArgs[1];
 
-    // Configure intermediate reduces
-    conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
-
-    // Set reducer class for intermediate reduce
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        MRJobConfig.REDUCE_CLASS_ATTR), IntSumReducer.class, Reducer.class);
-    // Set reducer output key class
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        MRJobConfig.MAP_OUTPUT_KEY_CLASS), IntWritable.class, Object.class);
-    // Set reducer output value class
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        MRJobConfig.MAP_OUTPUT_VALUE_CLASS), Text.class, Object.class);
-    conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        MRJobConfig.NUM_REDUCES), 2);
-
-    @SuppressWarnings("deprecation")
-    Job job = new Job(conf, "orderedwordcount");
-    job.setJarByClass(OrderedWordCount.class);
-
-    // Configure map
-    job.setMapperClass(TokenizerMapper.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(IntWritable.class);
-
-    // Configure reduce
-    job.setReducerClass(MyOrderByNoOpReducer.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
-    TezClient tezClient = new TezClient(new TezConfiguration(conf));
-
-    job.submit();
-    JobID jobId = job.getJobID();
-    ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
-
-    DAGClient dagClient = tezClient.getDAGClient(appId);
-    DAGStatus dagStatus = null;
-    while (true) {
-      dagStatus = dagClient.getDAGStatus();
-      if(dagStatus.getState() == DAGStatus.State.RUNNING ||
-         dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-         dagStatus.getState() == DAGStatus.State.FAILED ||
-         dagStatus.getState() == DAGStatus.State.KILLED ||
-         dagStatus.getState() == DAGStatus.State.ERROR) {
-        break;
-      }
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        // continue;
+    boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
+
+    UserGroupInformation.setConfiguration(conf);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+    TezClient tezClient = new TezClient(tezConf);
+    ApplicationId appId = tezClient.createApplication();
+
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputPath))) {
+      throw new FileAlreadyExistsException("Output directory " + outputPath +
+          " already exists");
+    }
+
+    String baseDir = Path.SEPARATOR + "user" + Path.SEPARATOR
+        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR;
+    Path stagingDir = new Path(baseDir + Path.SEPARATOR + appId.toString());
+    stagingDir = fs.makeQualified(stagingDir);
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    List<String> amArgs = new ArrayList<String>();
+    amArgs.add(MRHelpers.getMRAMJavaOpts(conf));
+
+    String jarPath = ClassUtil.findContainingJar(OrderedWordCount.class);
+    if (jarPath == null)  {
+        throw new TezUncheckedException("Could not find any jar containing"
+            + " OrderedWordCount.class in the classpath");
+    }
+    Path remoteJarPath = fs.makeQualified(
+        new Path(stagingDir, "dag_job.jar"));
+    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+    FileStatus jarFileStatus = fs.getFileStatus(remoteJarPath);
+
+    Map<String, LocalResource> commonLocalResources =
+        new TreeMap<String, LocalResource>();
+    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+        LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION,
+        jarFileStatus.getLen(),
+        jarFileStatus.getModificationTime());
+    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+    TezSession tezSession = null;
+    if (useTezSession) {
+      LOG.info("Creating Tez Session");
+      tezSession = tezClient.createSession(appId, "OrderedWordCountSession",
+          stagingDir, null, "default", amArgs, null, commonLocalResources,
+          tezConf);
+      LOG.info("Created Tez Session");
+    }
+
+    Configuration mapStageConf = new JobConf(conf);
+    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
+        TokenizerMapper.class.getName());
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        TextInputFormat.class.getName());
+    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+    mapStageConf.setBoolean("mapred.mapper.new-api", true);
+
+    InputSplitInfo inputSplitInfo =
+        MRHelpers.generateInputSplits(mapStageConf, stagingDir);
+    mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
+        null);
+
+    Configuration iReduceStageConf = new JobConf(conf);
+    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
+    iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+        IntSumReducer.class.getName());
+    iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        Text.class.getName());
+    iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(iReduceStageConf,
+        mapStageConf);
+
+    Configuration finalReduceConf = new JobConf(conf);
+    finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+        MyOrderByNoOpReducer.class.getName());
+    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        TextOutputFormat.class.getName());
+    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
+    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+        iReduceStageConf);
+
+    MRHelpers.doJobClientMagic(mapStageConf);
+    MRHelpers.doJobClientMagic(iReduceStageConf);
+    MRHelpers.doJobClientMagic(finalReduceConf);
+
+    List<Vertex> vertices = new ArrayList<Vertex>();
+
+    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(
+        MRHelpers.createUserPayloadFromConf(mapStageConf)),
+        inputSplitInfo.getNumTasks(),
+        MRHelpers.getMapResource(mapStageConf));
+    mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+    mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    Map<String, LocalResource> mapLocalResources =
+        new HashMap<String, LocalResource>();
+    mapLocalResources.putAll(commonLocalResources);
+    MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
+        mapLocalResources);
+    mapVertex.setTaskLocalResources(mapLocalResources);
+    Map<String, String> mapEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+    mapVertex.setTaskEnvironment(mapEnv);
+    vertices.add(mapVertex);
+
+    Vertex ivertex = new Vertex("ivertex1", new ProcessorDescriptor(
+        ReduceProcessor.class.getName()).
+        setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
+        2,
+        MRHelpers.getReduceResource(iReduceStageConf));
+    ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts(iReduceStageConf));
+    ivertex.setTaskLocalResources(commonLocalResources);
+    Map<String, String> ireduceEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(iReduceStageConf, ireduceEnv, false);
+    ivertex.setTaskEnvironment(ireduceEnv);
+    vertices.add(ivertex);
+
+    Vertex finalReduceVertex = new Vertex("finalreduce",
+        new ProcessorDescriptor(
+            ReduceProcessor.class.getName()).setUserPayload(
+                MRHelpers.createUserPayloadFromConf(finalReduceConf)),
+                1,
+                MRHelpers.getReduceResource(finalReduceConf));
+    finalReduceVertex.setJavaOpts(
+        MRHelpers.getReduceJavaOpts(finalReduceConf));
+    finalReduceVertex.setTaskLocalResources(commonLocalResources);
+    Map<String, String> reduceEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
+    finalReduceVertex.setTaskEnvironment(reduceEnv);
+    vertices.add(finalReduceVertex);
+
+    DAG dag = new DAG("OrderedWordCount");
+    for (int i = 0; i < vertices.size(); ++i) {
+      dag.addVertex(vertices.get(i));
+      if (i != 0) {
+        dag.addEdge(new Edge(vertices.get(i-1),
+            vertices.get(i), new EdgeProperty(
+                ConnectionPattern.BIPARTITE, SourceType.STABLE,
+                new OutputDescriptor(
+                    OnFileSortedOutput.class.getName()),
+                new InputDescriptor(
+                    ShuffledMergedInput.class.getName()))));
       }
     }
 
-    while (dagStatus.getState() == DAGStatus.State.RUNNING) {
-      try {
-        ExampleDriver.printMRRDAGStatus(dagStatus);
+    DAGClient dagClient;
+    if (useTezSession) {
+      LOG.info("Submitting DAG to Tez Session");
+      dagClient = tezClient.submitDAG(tezSession, dag);
+      LOG.info("Submitted DAG to Tez Session");
+    } else {
+      LOG.info("Submitting DAG as a new Tez Application");
+      dagClient = tezClient.submitDAGApplication(appId, dag, stagingDir, null,
+          "default", "OrderedWordCount", amArgs, null, commonLocalResources,
+          tezConf);
+    }
+
+    DAGStatus dagStatus = null;
+    try {
+      while (true) {
+        dagStatus = dagClient.getDAGStatus();
+        if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+            dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+            dagStatus.getState() == DAGStatus.State.FAILED ||
+            dagStatus.getState() == DAGStatus.State.KILLED ||
+            dagStatus.getState() == DAGStatus.State.ERROR) {
+          break;
+        }
         try {
-          Thread.sleep(1000);
+          Thread.sleep(500);
         } catch (InterruptedException e) {
           // continue;
         }
-        dagStatus = dagClient.getDAGStatus();
-      } catch (TezException e) {
-        LOG.fatal("Failed to get application progress. Exiting");
-        System.exit(-1);
+      }
+
+      while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+        try {
+          ExampleDriver.printMRRDAGStatus(dagStatus);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            // continue;
+          }
+          dagStatus = dagClient.getDAGStatus();
+        } catch (TezException e) {
+          LOG.fatal("Failed to get application progress. Exiting");
+          System.exit(-1);
+        }
+      }
+    } finally {
+      fs.delete(stagingDir, true);
+      if (useTezSession) {
+        tezClient.closeSession(tezSession);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 83dc927..df5ecb7 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -37,16 +37,21 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -170,31 +175,60 @@ public class TestMRRJobsDAGApi {
     // TODO Add cleanup code.
   }
 
-  // Submits a simple 3 stage sleep job using the DAG submit API instead of job
+  // Submits a simple 5 stage sleep job using the DAG submit API instead of job
   // client.
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmit() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(false);
-    
+    State finalState = testMRRSleepJobDagSubmitCore(false, false, false);
+
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     // TODO Add additional checks for tracking URL etc. - once it's exposed by
     // the DAG API.
   }
-  
-  // Submits a simple 3 stage sleep job using the DAG submit API instead of job
-  // client.
+
+  // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmitAndKill() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(true);
-    
+    State finalState = testMRRSleepJobDagSubmitCore(false, true, false);
+
+    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Submits a DAG to AM via RPC after AM has started
+  @Test(timeout = 60000)
+  public void testMRRSleepJobPlanViaRPC() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false);
+
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+  }
+
+  // Submits a simple 5 stage sleep job using tez session. Then kills it.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true, true, false);
+
     Assert.assertEquals(DAGStatus.State.KILLED, finalState);
     // TODO Add additional checks for tracking URL etc. - once it's exposed by
     // the DAG API.
   }
-  
-  public State testMRRSleepJobDagSubmitCore(boolean killDagWhileRunning) throws IOException,
+
+  // Create and close a tez session without submitting a job
+  @Test(timeout = 60000)
+  public void testTezSessionShutdown() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    testMRRSleepJobDagSubmitCore(true, false, true);
+  }
+
+  public State testMRRSleepJobDagSubmitCore(
+      boolean dagViaRPC,
+      boolean killDagWhileRunning,
+      boolean closeSessionBeforeSubmit) throws IOException,
       InterruptedException, TezException, ClassNotFoundException, YarnException {
     LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
 
@@ -232,10 +266,10 @@ public class TestMRRJobsDAGApi {
         IntWritable.class.getName());
     stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
         MRRSleepJobPartitioner.class.getName());
-    
+
     JobConf stage22Conf = new JobConf(stage2Conf);
     stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
-    
+
     stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
     stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
     stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
@@ -266,7 +300,7 @@ public class TestMRRJobsDAGApi {
         remoteStagingDir);
     InputSplitInfo inputSplitInfo1 = MRHelpers.generateInputSplits(stage1Conf,
         remoteStagingDir);
-    
+
     DAG dag = new DAG("testMRRSleepJobDagSubmit");
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(
@@ -282,7 +316,7 @@ public class TestMRRJobsDAGApi {
         inputSplitInfo1.getNumTasks(),  Resource.newInstance(256, 1));
     Vertex stage22Vertex = new Vertex("ireduce1", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(
-        MRHelpers.createUserPayloadFromConf(stage22Conf)),  
+        MRHelpers.createUserPayloadFromConf(stage22Conf)),
         2, Resource.newInstance(256, 1));
     Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(
@@ -316,7 +350,7 @@ public class TestMRRJobsDAGApi {
         createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
             LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
     stage1LocalResources.putAll(commonLocalResources);
-    
+
     Map<String, LocalResource> stage11LocalResources = new HashMap<String, LocalResource>();
     stage11LocalResources.put(
         inputSplitInfo1.getSplitsFile().getName(),
@@ -332,7 +366,7 @@ public class TestMRRJobsDAGApi {
     stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
     stage1Vertex.setTaskLocalResources(stage1LocalResources);
     stage1Vertex.setTaskEnvironment(commonEnv);
-    
+
     stage11Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
     stage11Vertex.setTaskLocationsHint(inputSplitInfo1.getTaskLocationHints());
     stage11Vertex.setTaskLocalResources(stage11LocalResources);
@@ -342,7 +376,7 @@ public class TestMRRJobsDAGApi {
     stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
     stage2Vertex.setTaskLocalResources(commonLocalResources);
     stage2Vertex.setTaskEnvironment(commonEnv);
-    
+
     stage22Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage22Conf));
     stage22Vertex.setTaskLocalResources(commonLocalResources);
     stage22Vertex.setTaskEnvironment(commonEnv);
@@ -379,28 +413,81 @@ public class TestMRRJobsDAGApi {
     dag.addEdge(edge2);
     dag.addEdge(edge3);
 
-    Map<String, LocalResource> amLocalResources = new HashMap<String, LocalResource>();
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
     amLocalResources.put("yarn-site.xml", yarnSiteLr);
     amLocalResources.putAll(commonLocalResources);
 
     TezClient tezClient = new TezClient(new TezConfiguration(
         mrrTezCluster.getConfig()));
-    // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
-    DAGClient dagClient = tezClient.submitDAGApplication(dag, remoteStagingDir,
-        null, "default", Collections.singletonList(""), commonEnv,
-        amLocalResources, new TezConfiguration());
+    DAGClient dagClient = null;
+    TezSession tezSession = null;
+    if(!dagViaRPC) {
+      // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
+      dagClient = tezClient.submitDAGApplication(dag, remoteStagingDir,
+          null, "default", Collections.singletonList(""), commonEnv,
+          amLocalResources, new TezConfiguration());
+    } else {
+      tezSession = tezClient.createSession("testsession", remoteStagingDir,
+          null, "default", Collections.singletonList(""), commonEnv,
+          amLocalResources, new TezConfiguration());
+    }
 
-    
+    if (dagViaRPC && closeSessionBeforeSubmit) {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(mrrTezCluster.getConfig());
+      yarnClient.start();
+      boolean sentKillSession = false;
+      while(true) {
+        Thread.sleep(500l);
+        ApplicationReport appReport =
+            yarnClient.getApplicationReport(tezSession.getApplicationId());
+        if (appReport == null) {
+          continue;
+        }
+        YarnApplicationState appState = appReport.getYarnApplicationState();
+        if (!sentKillSession) {
+          if (appState == YarnApplicationState.RUNNING) {
+            tezClient.closeSession(tezSession);
+            sentKillSession = true;
+          }
+        } else {
+          if (appState == YarnApplicationState.FINISHED
+              || appState == YarnApplicationState.KILLED
+              || appState == YarnApplicationState.FAILED) {
+            LOG.info("Application completed after sending session shutdown"
+                + ", yarnApplicationState=" + appState
+                + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
+            Assert.assertEquals(YarnApplicationState.FINISHED,
+                appState);
+            Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+                appReport.getFinalApplicationStatus());
+            break;
+          }
+        }
+      }
+      yarnClient.stop();
+      return null;
+    }
+
+    if(dagViaRPC) {
+      LOG.info("Submitting dag to tez session with appId="
+          + tezSession.getApplicationId());
+      dagClient = tezClient.submitDAG(tezSession, dag);
+    }
     DAGStatus dagStatus = dagClient.getDAGStatus();
     while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms. Current state: "
-          + dagStatus.getState());
-      // TODO The test will fail if the AM sleep is removed. TEZ-207 to fix
-      // this.
+      LOG.info("Waiting for job to complete. Sleeping for 500ms."
+          + " Current state: " + dagStatus.getState());
       Thread.sleep(500l);
-      if(killDagWhileRunning && dagStatus.getState() == DAGStatus.State.RUNNING){
-        dagClient.tryKillDAG();
-        dagStatus = dagClient.getDAGStatus();
+      if(killDagWhileRunning
+          && dagStatus.getState() == DAGStatus.State.RUNNING) {
+        LOG.info("Killing running dag/session");
+        if (dagViaRPC) {
+          tezClient.closeSession(tezSession);
+        } else {
+          dagClient.tryKillDAG();
+        }
       }
       dagStatus = dagClient.getDAGStatus();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 40a9563..7360af4 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -569,6 +569,7 @@ public class YARNRunner implements ClientProtocol {
           ts,
           jobConf.get(JobContext.QUEUE_NAME,
               YarnConfiguration.DEFAULT_QUEUE_NAME),
+          dag.getName(),
           vargs,
           environment,
           jobLocalResources, dagAMConf);


Mime
View raw message