tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-408. Add support for multiple DAGs within the same AM. (hitesh)
Date Fri, 27 Sep 2013 17:32:02 GMT
Updated Branches:
  refs/heads/master fcbf0ddaf -> 849f12579


TEZ-408. Add support for multiple DAGs within the same AM. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/849f1257
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/849f1257
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/849f1257

Branch: refs/heads/master
Commit: 849f12579654bdf168c9f14bb68eb9205e7787b1
Parents: fcbf0dd
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Sep 27 10:31:42 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Sep 27 10:31:42 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |   7 +-
 .../java/org/apache/tez/client/TezSession.java  |  17 +-
 .../common/counters/FileSystemCounterGroup.java |   2 -
 .../common/counters/FrameworkCounterGroup.java  |   8 +-
 .../org/apache/tez/dag/api/TezConstants.java    |   6 +-
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  12 +-
 .../java/org/apache/tez/dag/app/AppContext.java |   2 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 247 ++++++++++++------
 .../apache/tez/dag/app/DAGAppMasterState.java   |   1 +
 .../tez/dag/app/dag/TaskTerminationCause.java   |   4 +-
 .../dag/event/DAGAppMasterEventDAGFinished.java |  12 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  12 +-
 .../app/rm/container/AMContainerHelpers.java    |  26 +-
 .../dag/app/rm/container/AMContainerImpl.java   |   4 +-
 .../mapreduce/examples/OrderedWordCount.java    | 253 +++++++++++--------
 .../org/apache/tez/mapreduce/TestMRRJobs.java   |   5 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 114 ++++++++-
 17 files changed, 503 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 7c6a5ed..8689385 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -261,6 +261,10 @@ public class TezClientUtils {
         TezConfiguration.DEFAULT_TEZ_AM_JAVA_OPTS));
 
     vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
+    if (dag == null) {
+      vargs.add("--" + TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+    }
+
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
         File.separator + ApplicationConstants.STDOUT);
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -396,9 +400,6 @@ public class TezClientUtils {
                 textPath, LocalResourceType.FILE,
                 LocalResourceVisibility.APPLICATION));
       }
-    } else {
-      Apps.addToEnvironment(environment,
-          TezConstants.TEZ_AM_IS_SESSION_ENV, "set");
     }
 
     Map<ApplicationAccessType, String> acls

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index acf523d..dd7fcab 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -88,6 +88,8 @@ public class TezSession {
               sessionConfig.getTezConfiguration(), applicationId,
               null, sessionName, sessionConfig.getAMConfiguration(),
               tezJarResources);
+      // Set Tez Sessions to not retry on AM crashes
+      appContext.setMaxAppAttempts(1);
       tezConfPBLRsrc = appContext.getAMContainerSpec().getLocalResources().get(
           TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
       yarnClient.submitApplication(appContext);
@@ -121,6 +123,7 @@ public class TezSession {
 
     DAGClientAMProtocolBlockingPB proxy;
     while (true) {
+      // FIXME implement a max time to wait for submit
       proxy = TezClientUtils.getAMProxy(yarnClient,
           sessionConfig.getYarnConfiguration(), applicationId);
       if (proxy != null) {
@@ -150,17 +153,19 @@ public class TezSession {
     LOG.info("Shutting down Tez Session"
         + ", sessionName=" + sessionName
         + ", applicationId=" + applicationId);
-    DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(yarnClient,
-        sessionConfig.getYarnConfiguration(), applicationId);
-    if (proxy != null) {
-      try {
+    try {
+      DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(
+          yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
+      if (proxy != null) {
         ShutdownSessionRequestProto request =
             ShutdownSessionRequestProto.newBuilder().build();
         proxy.shutdownSession(null, request);
         return;
-      } catch (ServiceException e) {
-        LOG.info("Failed to shutdown Tez Session via proxy", e);
       }
+    } catch (TezException e) {
+      LOG.info("Failed to shutdown Tez Session via proxy", e);
+    } catch (ServiceException e) {
+      LOG.info("Failed to shutdown Tez Session via proxy", e);
     }
     LOG.info("Could not connect to AM, killing session via YARN"
         + ", sessionName=" + sessionName

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
index d4b167a..bb2dc8b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
@@ -232,7 +232,6 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public void incrAllCounters(CounterGroupBase<C> other) {
     if (checkNotNull(other.getUnderlyingGroup(), "other group")
         instanceof FileSystemCounterGroup<?>) {
@@ -255,7 +254,6 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
       WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
       for (Object counter : entry.getValue()) {
         if (counter == null) continue;
-        @SuppressWarnings("unchecked")
         FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
         WritableUtils.writeVInt(out, c.key.ordinal());  // key
         WritableUtils.writeVLong(out, c.getValue());    // value

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
index 42fb636..a99e5a4 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
@@ -26,8 +26,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -44,8 +42,7 @@ import com.google.common.collect.Iterators;
 @InterfaceAudience.Private
 public abstract class FrameworkCounterGroup<T extends Enum<T>,
     C extends TezCounter> implements CounterGroupBase<C> {
-  private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
-  
+
   private final Class<T> enumClass; // for Enum.valueOf
   private final Object[] counters;  // local casts are OK and save a class ref
   private String displayName = null;
@@ -106,7 +103,6 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
     }
   }
 
-  @SuppressWarnings("unchecked")
   public FrameworkCounterGroup(Class<T> enumClass) {
     this.enumClass = enumClass;
     T[] enums = enumClass.getEnumConstants();
@@ -194,8 +190,8 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
     return n;
   }
 
+  @SuppressWarnings("rawtypes")
   @Override
-  @SuppressWarnings("unchecked")
   public void incrAllCounters(CounterGroupBase<C> other) {
     if (checkNotNull(other, "other counter group")
         instanceof FrameworkCounterGroup<?, ?>) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 5463d65..5109a0f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -23,7 +23,9 @@ package org.apache.tez.dag.api;
  */
 public class TezConstants {
 
-  // Env variable names
-  public static final String TEZ_AM_IS_SESSION_ENV = "TEZ_AM_IS_SESSION";
+  /**
+   * Command-line argument to be set when running the Tez AM in session mode.
+   */
+  public static final String TEZ_SESSION_MODE_CLI_OPTION = "session";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3092837..a20b774 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -368,7 +368,7 @@ public class YarnTezDagChild {
         for (int idle = 0; null == containerTask; ++idle) {
           long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
           LOG.info("Sleeping for " + sleepTimeMilliSecs
-              + "ms before retrying again. Got null now.");
+              + "ms before retrying getTask again. Got null now.");
           MILLISECONDS.sleep(sleepTimeMilliSecs);
           containerTask = umbilical.getTask(containerContext);
         }
@@ -404,7 +404,7 @@ public class YarnTezDagChild {
           }
           lastVertexId = newVertexId;
           updateLoggers(currentTaskAttemptID);
-          
+
           currentTask = createLogicalTask(attemptNumber, taskSpec,
               defaultConf, tezUmbilical, serviceConsumerMetadata);
         } finally {
@@ -426,9 +426,15 @@ public class YarnTezDagChild {
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
+            LOG.info("Initializing task"
+                + ", taskAttemptId=" + currentTaskAttemptID);
             currentTask.initialize();
             if (!currentTask.hadFatalError()) {
+              LOG.info("Running task"
+                  + ", taskAttemptId=" + currentTaskAttemptID);
               currentTask.run();
+              LOG.info("Closing task"
+                  + ", taskAttemptId=" + currentTaskAttemptID);
               currentTask.close();
             }
             LOG.info("Task completed"
@@ -525,7 +531,7 @@ public class YarnTezDagChild {
     String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
     conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
     LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
-    
+
     return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
         tezUmbilical, serviceConsumerMetadata);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 64f8965..26c0992 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -42,7 +42,7 @@ public interface AppContext {
 
   DAGAppMaster getAppMaster();
 
-  Configuration getConf();
+  Configuration getAMConf();
 
   ApplicationId getApplicationID();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b846992..88d1a30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -35,6 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -140,6 +143,7 @@ public class DAGAppMaster extends AbstractService {
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private Clock clock;
+  private final boolean isSession;
   private long appsStartTime;
   private final long startTime;
   private final long appSubmitTime;
@@ -152,7 +156,7 @@ public class DAGAppMaster extends AbstractService {
   private AMContainerMap containers;
   private AMNodeMap nodes;
   private AppContext context;
-  private Configuration conf;
+  private Configuration amConf;
   private Dispatcher dispatcher;
   private ContainerLauncher containerLauncher;
   private TaskCleaner taskCleaner;
@@ -178,20 +182,31 @@ public class DAGAppMaster extends AbstractService {
   private Credentials fsTokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
 
+  private AtomicBoolean sessionStopped = new AtomicBoolean(false);
+
+  // DAG Counter
+  private final AtomicInteger dagCounter = new AtomicInteger();
+
+  // Session counters
+  private final AtomicInteger submittedDAGs = new AtomicInteger();
+  private final AtomicInteger successfulDAGs = new AtomicInteger();
+  private final AtomicInteger failedDAGs = new AtomicInteger();
+  private final AtomicInteger killedDAGs = new AtomicInteger();
+
   // must be LinkedHashMap to preserve order of service addition
   Map<Service, ServiceWithDependency> services =
       new LinkedHashMap<Service, ServiceWithDependency>();
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      long appSubmitTime) {
+      long appSubmitTime, boolean isSession) {
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime);
+        new SystemClock(), appSubmitTime, isSession);
   }
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime) {
+      Clock clock, long appSubmitTime, boolean isSession) {
     super(DAGAppMaster.class.getName());
     this.clock = clock;
     this.startTime = clock.getTime();
@@ -202,6 +217,7 @@ public class DAGAppMaster extends AbstractService {
     this.nmPort = nmPort;
     this.nmHttpPort = nmHttpPort;
     this.state = DAGAppMasterState.NEW;
+    this.isSession = isSession;
     // TODO Metrics
     //this.metrics = DAGAppMetrics.create();
     LOG.info("Created DAGAppMaster for application " + applicationAttemptId);
@@ -212,7 +228,7 @@ public class DAGAppMaster extends AbstractService {
 
     this.state = DAGAppMasterState.INITED;
 
-    this.conf = conf;
+    this.amConf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     downloadTokensAndSetupUGI(conf);
@@ -307,10 +323,47 @@ public class DAGAppMaster extends AbstractService {
       }
       break;
     case DAG_FINISHED:
-      setStateOnDAGCompletion();
-      LOG.info("Shutting down on completion of dag:" +
-              ((DAGAppMasterEventDAGFinished)event).getDAGId().toString());
-      shutdownHandler.shutdown();
+      DAGAppMasterEventDAGFinished finishEvt =
+          (DAGAppMasterEventDAGFinished) event;
+      if (!isSession) {
+        setStateOnDAGCompletion();
+        LOG.info("Shutting down on completion of dag:" +
+              finishEvt.getDAGId().toString());
+        shutdownHandler.shutdown();
+      } else {
+        LOG.info("DAG completed, dagId="
+            + finishEvt.getDAGId().toString()
+            + ", dagState=" + finishEvt.getDAGState());
+        switch(finishEvt.getDAGState()) {
+        case SUCCEEDED:
+          successfulDAGs.incrementAndGet();
+          break;
+        case ERROR:
+        case FAILED:
+          failedDAGs.incrementAndGet();
+          break;
+        case KILLED:
+          killedDAGs.incrementAndGet();
+          break;
+        default:
+          LOG.fatal("Received a DAG Finished Event with state="
+              + finishEvt.getDAGState()
+              + ". Error. Shutting down.");
+          state = DAGAppMasterState.ERROR;
+          shutdownHandler.shutdown();
+          break;
+        }
+        if (!state.equals(DAGAppMasterState.ERROR)) {
+          if (!sessionStopped.get()) {
+            LOG.info("Waiting for next DAG to be submitted.");
+            state = DAGAppMasterState.IDLE;
+          } else {
+            LOG.info("Session shutting down now.");
+            state = DAGAppMasterState.SUCCEEDED;
+            shutdownHandler.shutdown();
+          }
+        }
+      }
       break;
     default:
       throw new TezUncheckedException(
@@ -373,10 +426,21 @@ public class DAGAppMaster extends AbstractService {
 
   /** Create and initialize (but don't start) a single dag. */
   protected DAG createDAG(DAGPlan dagPB) {
-    TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
-    // create single job
+    TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
+        dagCounter.incrementAndGet());
+
+    Iterator<PlanKeyValuePair> iter =
+        dagPB.getDagKeyValues().getConfKeyValuesList().iterator();
+    Configuration dagConf = new Configuration(amConf);
+
+    while (iter.hasNext()) {
+      PlanKeyValuePair keyValPair = iter.next();
+      dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+    }
+
+    // create single dag
     DAG newDag =
-        new DAGImpl(dagId, conf, dagPB, dispatcher.getEventHandler(),
+        new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
             currentUser.getShortUserName(),
             taskHeartbeatHandler, context);
@@ -524,8 +588,15 @@ public class DAGAppMaster extends AbstractService {
   }
 
   public List<String> getDiagnostics() {
-    if(currentDAG != null) {
-      return currentDAG.getDiagnostics();
+    if (!isSession) {
+      if(currentDAG != null) {
+        return currentDAG.getDiagnostics();
+      }
+    } else {
+      return Collections.singletonList("Session stats:"
+          + "submittedDAGs=" + submittedDAGs.get()
+          + ", successfulDAGs=" + successfulDAGs.get()
+          + ", failedDAGs=" + failedDAGs.get());
     }
     return null;
   }
@@ -539,37 +610,74 @@ public class DAGAppMaster extends AbstractService {
 
   private synchronized void setStateOnDAGCompletion() {
     DAGAppMasterState oldState = state;
-    if(state == DAGAppMasterState.RUNNING) {
-      switch(currentDAG.getState()) {
-      case SUCCEEDED:
-        state = DAGAppMasterState.SUCCEEDED;
-        break;
-      case FAILED:
-        state = DAGAppMasterState.FAILED;
-        break;
-      case KILLED:
-        state = DAGAppMasterState.KILLED;
-        break;
-      case ERROR:
-        state = DAGAppMasterState.ERROR;
-        break;
-      default:
-        state = DAGAppMasterState.ERROR;
-        break;
-      }
+    if(isSession) {
+      return;
+    }
+    switch(currentDAG.getState()) {
+    case SUCCEEDED:
+      state = DAGAppMasterState.SUCCEEDED;
+      break;
+    case FAILED:
+      state = DAGAppMasterState.FAILED;
+      break;
+    case KILLED:
+      state = DAGAppMasterState.KILLED;
+      break;
+    case ERROR:
+      state = DAGAppMasterState.ERROR;
+      break;
+    default:
+      state = DAGAppMasterState.ERROR;
+      break;
     }
     LOG.info("On DAG completion. Old state: "
         + oldState + " new state: " + state);
   }
 
+  synchronized String submitDAGToAppMaster(DAGPlan dagPlan)
+      throws TezException  {
+    if(currentDAG != null
+        && !state.equals(DAGAppMasterState.IDLE)) {
+      throw new TezException("App master already running a DAG");
+    }
+    if (state.equals(DAGAppMasterState.ERROR)
+        || sessionStopped.get()) {
+      throw new TezException("AM unable to accept new DAG submissions."
+          + " In the process of shutting down");
+    }
+    
+    // RPC server runs in the context of the job user as it was started in
+    // the job user's UGI context
+    LOG.info("Starting DAG submitted via RPC");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing DAG plan to: "
+          + TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
+
+      File outFile = new File(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
+      try {
+        PrintWriter printWriter = new PrintWriter(outFile);
+        String dagPbString = dagPlan.toString();
+        printWriter.println(dagPbString);
+        printWriter.close();
+      } catch (IOException e) {
+        throw new TezException("Failed to write TEZ_PLAN to "
+            + outFile.toString(), e);
+      }
+    }
+
+    submittedDAGs.incrementAndGet();
+    startDAG(dagPlan);
+    return currentDAG.getID().toString();
+  }
+
   public class DAGClientHandler {
 
     public List<String> getAllDAGs() throws TezException {
       return Collections.singletonList(currentDAG.getID().toString());
     }
 
-    public DAGStatus getDAGStatus(String dagIdStr)
-                                      throws TezException {
+    public DAGStatus getDAGStatus(String dagIdStr) throws TezException {
       return getDAG(dagIdStr).getDAGStatus();
     }
 
@@ -588,6 +696,7 @@ public class DAGAppMaster extends AbstractService {
       if(dagId == null) {
         throw new TezException("Bad dagId: " + dagIdStr);
       }
+
       if(currentDAG == null) {
         throw new TezException("No running dag at present");
       }
@@ -607,29 +716,7 @@ public class DAGAppMaster extends AbstractService {
     }
 
     public synchronized String submitDAG(DAGPlan dagPlan) throws TezException {
-      if(currentDAG != null) {
-        throw new TezException("App master already running a DAG");
-      }
-      // RPC server runs in the context of the job user as it was started in
-      // the job user's UGI context
-      LOG.info("Starting DAG submitted via RPC");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Writing DAG plan to: " + TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
-        
-        File outFile = new File(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
-        try {
-          PrintWriter printWriter = new PrintWriter(outFile);
-          String dagPbString = dagPlan.toString();
-          printWriter.println(dagPbString);
-          printWriter.close();
-        } catch (IOException e) {
-          throw new TezException("Failed to write TEZ_PLAN to " + outFile.toString(), e);
-        }
-        
-        
-      }
-      startDAG(dagPlan);
-      return currentDAG.getID().toString();
+      return submitDAGToAppMaster(dagPlan);
     }
 
     public synchronized void shutdownAM() {
@@ -640,8 +727,12 @@ public class DAGAppMaster extends AbstractService {
         LOG.info("Sending a kill event to the current DAG"
             + ", dagId=" + currentDAG.getID());
         sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+        sessionStopped.set(true);
       } else {
         LOG.info("No current running DAG, shutting down the AM");
+        if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
+          state = DAGAppMasterState.SUCCEEDED;
+        }
         shutdownHandler.shutdown();
       }
     }
@@ -665,7 +756,7 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
-    public Configuration getConf() {
+    public Configuration getAMConf() {
       return conf;
     }
 
@@ -944,7 +1035,7 @@ public class DAGAppMaster extends AbstractService {
     startServices();
     super.serviceStart();
 
-    this.state = DAGAppMasterState.RUNNING;
+    this.state = DAGAppMasterState.IDLE;
 
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
@@ -955,6 +1046,12 @@ public class DAGAppMaster extends AbstractService {
         startTime, appsStartTime, appSubmitTime);
     dispatcher.getEventHandler().handle(
         new DAGHistoryEvent(startEvent));
+
+    if (!isSession) {
+      startDAG();
+    } else {
+      LOG.info("In Session mode. Waiting for DAG over RPC");
+    }
   }
 
   @Override
@@ -1049,10 +1146,18 @@ public class DAGAppMaster extends AbstractService {
       // the objects myself.
       conf.setBoolean("fs.automatic.close", false);
 
+      // Command line options
+      Options opts = new Options();
+      opts.addOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
+          false, "Run Tez Application Master in Session mode");
+
+      CommandLine cliParser = new GnuParser().parse(opts, args);
+
       DAGAppMaster appMaster =
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime);
+              Integer.parseInt(nodeHttpPortString), appSubmitTime,
+              cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION));
       ShutdownHookManager.get().addShutdownHook(
         new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 
@@ -1086,12 +1191,13 @@ public class DAGAppMaster extends AbstractService {
         LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler");
         appMaster.taskSchedulerEventHandler.setSignalled(true);
       }
+
       if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED,
-          DAGAppMasterState.RUNNING).contains(appMaster.state)) {
-        // DAG not in a final state. Must have receive a KILL signal
+          DAGAppMasterState.IDLE, DAGAppMasterState.RUNNING)
+          .contains(appMaster.state)) {
+            // DAG not in a final state. Must have receive a KILL signal
         appMaster.state = DAGAppMasterState.KILLED;
       }
-
       appMaster.stop();
     }
   }
@@ -1127,13 +1233,6 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
-    Iterator<PlanKeyValuePair> iter =
-        dagPlan.getDagKeyValues().getConfKeyValuesList().iterator();
-    while (iter.hasNext()) {
-      PlanKeyValuePair keyValPair = iter.next();
-      conf.set(keyValPair.getKey(), keyValPair.getValue());
-    }
-
     // Job name is the same as the app name until we support multiple dags
     // for an app later
     appName = dagPlan.getName();
@@ -1145,6 +1244,8 @@ public class DAGAppMaster extends AbstractService {
 
   private void startDAG(DAG dag) {
     currentDAG = dag;
+    this.state = DAGAppMasterState.RUNNING;
+
     // End of creating the job.
     ((RunningAppContext) context).setDAG(currentDAG);
 
@@ -1177,12 +1278,6 @@ public class DAGAppMaster extends AbstractService {
       public Object run() throws Exception {
         appMaster.init(conf);
         appMaster.start();
-        String submitDAGOverRpc = System.getenv(TezConstants.TEZ_AM_IS_SESSION_ENV);
-        if(submitDAGOverRpc == null || submitDAGOverRpc.isEmpty()) {
-          appMaster.startDAG();
-        } else {
-          LOG.info("Waiting for DAG over RPC");
-        }
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
index f598eeb..a410c0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
 public enum DAGAppMasterState {
   NEW,
   INITED,
+  IDLE,
   RUNNING,
   SUCCEEDED,
   FAILED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
index 6736d2a..321dd01 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
@@ -24,8 +24,8 @@ package org.apache.tez.dag.app.dag;
 public enum TaskTerminationCause {
 
   /** DAG was killed  */
-  DAG_KILL, 
-  
+  DAG_KILL,
+
   /** Other vertex failed causing DAG to fail thus killing the parent vertex  */
   OTHER_VERTEX_FAILURE,
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
index cad26d1..f58dace 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
@@ -18,17 +18,25 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.records.TezDAGID;
 
 public class DAGAppMasterEventDAGFinished extends DAGAppMasterEvent {
   private final TezDAGID dagId;
+  private final DAGState dagState;
 
-  public DAGAppMasterEventDAGFinished(TezDAGID dagId) {
+  public DAGAppMasterEventDAGFinished(TezDAGID dagId,
+      DAGState dagState) {
     super(DAGAppMasterEventType.DAG_FINISHED);
     this.dagId = dagId;
+    this.dagState = dagState;
   }
-  
+
   public TezDAGID getDAGId() {
     return dagId;
   }
+
+  public DAGState getDAGState() {
+    return dagState;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index abde208..a8d7c30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -713,7 +713,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
     */
     if (finishTime == 0) setFinishTime();
-    eventHandler.handle(new DAGAppMasterEventDAGFinished(getID()));
+    eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
 
     // TODO Metrics
     /*
@@ -869,7 +869,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.FAILED);
       }
     }
-    
+
     private void createDAGEdges(DAGImpl dag) {
       for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
         EdgeProperty edgeProperty = DagTypeConverters
@@ -1156,7 +1156,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
 
   }
-  
+
   private static class VertexReRunningTransition implements
       SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
@@ -1165,7 +1165,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
       job.numCompletedVertices--;
       job.vertexReRunning(vertex);
-      
+
 
       LOG.info("Vertex " + vertex.getVertexId() + " re-running."
           + ", numCompletedVertices=" + job.numCompletedVertices
@@ -1175,13 +1175,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numVertices=" + job.numVertices);
     }
   }
-  
+
   private void vertexSucceeded(Vertex vertex) {
     numSuccessfulVertices++;
     // TODO: Metrics
     //job.metrics.completedTask(task);
   }
-  
+
   private void vertexReRunning(Vertex vertex) {
     numSuccessfulVertices--;
     addDiagnostic("Vertex re-running " + vertex.getVertexId());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 11e8aeb..be7d99d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.TezRuntimeChildJVM;
 import org.apache.tez.runtime.library.common.security.TokenCache;
 import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
@@ -55,7 +56,10 @@ public class AMContainerHelpers {
   private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
 
   private static Object commonContainerSpecLock = new Object();
-  private static ContainerLaunchContext commonContainerSpec = null;
+  private static TezDAGID lastDAGID = null;
+  private static Map<TezDAGID, ContainerLaunchContext> commonContainerSpecs =
+      new HashMap<TezDAGID, ContainerLaunchContext>();
+
 
   /**
    * Create a {@link LocalResource} record with all the given parameters.
@@ -134,6 +138,7 @@ public class AMContainerHelpers {
 
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
+      TezDAGID tezDAGID,
       Map<ApplicationAccessType, String> acls,
       ContainerId containerId,
       Map<String, LocalResource> localResources,
@@ -142,10 +147,23 @@ public class AMContainerHelpers {
       TaskAttemptListener taskAttemptListener, Credentials credentials,
       boolean shouldProfile, AppContext appContext) {
 
+    ContainerLaunchContext commonContainerSpec = null;
     synchronized (commonContainerSpecLock) {
-      if (commonContainerSpec == null) {
-        commonContainerSpec = createCommonContainerLaunchContext(
-            acls, credentials);
+      if (!commonContainerSpecs.containsKey(tezDAGID)) {
+        commonContainerSpec =
+            createCommonContainerLaunchContext(acls, credentials);
+        commonContainerSpecs.put(tezDAGID, commonContainerSpec);
+      } else {
+        commonContainerSpec = commonContainerSpecs.get(tezDAGID);
+      }
+
+      // Ensure that we remove container specs for previous AMs to reduce
+      // memory footprint
+      if (lastDAGID == null) {
+        lastDAGID = tezDAGID;
+      } else if (!lastDAGID.equals(tezDAGID)) {
+        commonContainerSpecs.remove(lastDAGID);
+        lastDAGID = tezDAGID;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 4731a24..9303b62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -71,7 +71,8 @@ public class AMContainerImpl implements AMContainer {
   private final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
 
-  private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
+  private final List<TezTaskAttemptID> completedAttempts =
+      new LinkedList<TezTaskAttemptID>();
 
   // TODO Maybe this should be pulled from the TaskAttempt.s
   private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap =
@@ -330,6 +331,7 @@ public class AMContainerImpl implements AMContainer {
       ContainerContext containerContext = event.getContainerContext();
 
       container.clc = AMContainerHelpers.createContainerLaunchContext(
+          container.appContext.getCurrentDAGID(),
           container.appContext.getApplicationACLs(),
           container.getContainerId(),
           containerContext.getLocalResources(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index cad79f5..9d54a98 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -82,6 +82,12 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 /**
  * An MRR job built on top of word count to return words sorted by
  * their frequency of occurrence.
+ *
+ * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
+ * If multiple input/outputs are provided, this job will process each pair
+ * as a separate DAG in a sequential manner.
+ * Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
+ * between the sequential DAGs.
  */
 public class OrderedWordCount {
 
@@ -136,74 +142,9 @@ public class OrderedWordCount {
     }
   }
 
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: wordcount <in> <out>");
-      System.exit(2);
-    }
-    String inputPath = otherArgs[0];
-    String outputPath = otherArgs[1];
-
-    boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
-
-    UserGroupInformation.setConfiguration(conf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    TezConfiguration tezConf = new TezConfiguration(conf);
-    TezClient tezClient = new TezClient(tezConf);
-    ApplicationId appId = tezClient.createApplication();
-
-    FileSystem fs = FileSystem.get(conf);
-    if (fs.exists(new Path(outputPath))) {
-      throw new FileAlreadyExistsException("Output directory " + outputPath +
-          " already exists");
-    }
-
-    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
-        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
-        + Path.SEPARATOR + appId.toString();
-    Path stagingDir = new Path(stagingDirStr);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = fs.makeQualified(stagingDir);
-    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
-    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
-        MRHelpers.getMRAMJavaOpts(conf));
-
-    String jarPath = ClassUtil.findContainingJar(OrderedWordCount.class);
-    if (jarPath == null)  {
-        throw new TezUncheckedException("Could not find any jar containing"
-            + " OrderedWordCount.class in the classpath");
-    }
-    Path remoteJarPath = fs.makeQualified(
-        new Path(stagingDir, "dag_job.jar"));
-    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
-    FileStatus jarFileStatus = fs.getFileStatus(remoteJarPath);
-
-    Map<String, LocalResource> commonLocalResources =
-        new TreeMap<String, LocalResource>();
-    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
-        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-        LocalResourceType.FILE,
-        LocalResourceVisibility.APPLICATION,
-        jarFileStatus.getLen(),
-        jarFileStatus.getModificationTime());
-    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
-
-    TezSession tezSession = null;
-    AMConfiguration amConfig = new AMConfiguration("default", null,
-        commonLocalResources, tezConf, null);
-    if (useTezSession) {
-      LOG.info("Creating Tez Session");
-      TezSessionConfiguration sessionConfig =
-          new TezSessionConfiguration(amConfig, tezConf);
-      tezSession = new TezSession("OrderedWordCountSession",
-          sessionConfig);
-      tezSession.start();
-      LOG.info("Created Tez Session");
-    }
+  private static DAG createDAG(FileSystem fs, Configuration conf,
+      Map<String, LocalResource> commonLocalResources, Path stagingDir,
+      int dagIndex, String inputPath, String outputPath) throws Exception {
 
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -303,62 +244,170 @@ public class OrderedWordCount {
     finalReduceVertex.setTaskEnvironment(reduceEnv);
     vertices.add(finalReduceVertex);
 
-    DAG dag = new DAG("OrderedWordCount");
+    DAG dag = new DAG("OrderedWordCount" + dagIndex);
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
       if (i != 0) {
         dag.addEdge(new Edge(vertices.get(i-1),
             vertices.get(i), new EdgeProperty(
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL, 
+                SchedulingType.SEQUENTIAL,
                 new OutputDescriptor(
                     OnFileSortedOutput.class.getName()),
                 new InputDescriptor(
                     ShuffledMergedInputLegacy.class.getName()))));
       }
     }
+    return dag;
+  }
 
-    DAGClient dagClient;
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
+    long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
+        * 1000;
+    if (((otherArgs.length%2) != 0)
+        || (!useTezSession && otherArgs.length != 2)) {
+      System.err.println("Usage: wordcount <in> <out>");
+      System.err.println("Usage (In Session Mode):"
+          + " wordcount <in1> <out1> ... <inN> <outN>");
+      System.exit(2);
+    }
+    List<String> inputPaths = new ArrayList<String>();
+    List<String> outputPaths = new ArrayList<String>();
+
+    for (int i = 0; i < otherArgs.length; i+=2) {
+      inputPaths.add(otherArgs[i]);
+      outputPaths.add(otherArgs[i+1]);
+    }
+
+    UserGroupInformation.setConfiguration(conf);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+    TezClient tezClient = new TezClient(tezConf);
+    ApplicationId appId = tezClient.createApplication();
+
+    FileSystem fs = FileSystem.get(conf);
+
+    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+        + Path.SEPARATOR + appId.toString();
+    Path stagingDir = new Path(stagingDirStr);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+    stagingDir = fs.makeQualified(stagingDir);
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        MRHelpers.getMRAMJavaOpts(conf));
+
+    String jarPath = ClassUtil.findContainingJar(OrderedWordCount.class);
+    if (jarPath == null)  {
+        throw new TezUncheckedException("Could not find any jar containing"
+            + " OrderedWordCount.class in the classpath");
+    }
+    Path remoteJarPath = fs.makeQualified(
+        new Path(stagingDir, "dag_job.jar"));
+    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+    FileStatus jarFileStatus = fs.getFileStatus(remoteJarPath);
+
+    Map<String, LocalResource> commonLocalResources =
+        new TreeMap<String, LocalResource>();
+    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+        LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION,
+        jarFileStatus.getLen(),
+        jarFileStatus.getModificationTime());
+    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+    TezSession tezSession = null;
+    AMConfiguration amConfig = new AMConfiguration("default", null,
+        commonLocalResources, tezConf, null);
     if (useTezSession) {
-      LOG.info("Submitting DAG to Tez Session");
-      dagClient = tezSession.submitDAG(dag);
-      LOG.info("Submitted DAG to Tez Session");
-    } else {
-      LOG.info("Submitting DAG as a new Tez Application");
-      dagClient = tezClient.submitDAGApplication(dag, amConfig);
+      LOG.info("Creating Tez Session");
+      TezSessionConfiguration sessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("OrderedWordCountSession", appId,
+          sessionConfig);
+      tezSession.start();
+      LOG.info("Created Tez Session");
     }
 
     DAGStatus dagStatus = null;
     try {
-      while (true) {
-        dagStatus = dagClient.getDAGStatus();
-        if(dagStatus.getState() == DAGStatus.State.RUNNING ||
-            dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-            dagStatus.getState() == DAGStatus.State.FAILED ||
-            dagStatus.getState() == DAGStatus.State.KILLED ||
-            dagStatus.getState() == DAGStatus.State.ERROR) {
-          break;
+      for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
+        if (dagIndex != 1
+            && interJobSleepTimeout > 0) {
+          try {
+            LOG.info("Sleeping between jobs, sleepInterval="
+                + (interJobSleepTimeout/1000));
+            Thread.sleep(interJobSleepTimeout);
+          } catch (InterruptedException e) {
+            LOG.info("Main thread interrupted. Breaking out of job loop");
+            break;
+          }
         }
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          // continue;
+
+        String inputPath = inputPaths.get(dagIndex-1);
+        String outputPath = outputPaths.get(dagIndex-1);
+
+        if (fs.exists(new Path(outputPath))) {
+          throw new FileAlreadyExistsException("Output directory "
+              + outputPath + " already exists");
+        }
+        LOG.info("Running OrderedWordCount DAG"
+            + ", dagIndex=" + dagIndex
+            + ", inputPath=" + inputPath
+            + ", outputPath=" + outputPath);
+
+        DAG dag = createDAG(fs, conf, commonLocalResources, stagingDir,
+            dagIndex, inputPath, outputPath);
+
+        DAGClient dagClient;
+        if (useTezSession) {
+          LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
+          dagClient = tezSession.submitDAG(dag);
+          LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
+        } else {
+          LOG.info("Submitting DAG as a new Tez Application");
+          dagClient = tezClient.submitDAGApplication(dag, amConfig);
         }
-      }
 
-      while (dagStatus.getState() == DAGStatus.State.RUNNING) {
-        try {
-          ExampleDriver.printMRRDAGStatus(dagStatus);
+        while (true) {
+          dagStatus = dagClient.getDAGStatus();
+          if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+              dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+              dagStatus.getState() == DAGStatus.State.FAILED ||
+              dagStatus.getState() == DAGStatus.State.KILLED ||
+              dagStatus.getState() == DAGStatus.State.ERROR) {
+            break;
+          }
           try {
-            Thread.sleep(1000);
+            Thread.sleep(500);
           } catch (InterruptedException e) {
             // continue;
           }
-          dagStatus = dagClient.getDAGStatus();
-        } catch (TezException e) {
-          LOG.fatal("Failed to get application progress. Exiting");
-          System.exit(-1);
         }
+
+        while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+          try {
+            ExampleDriver.printMRRDAGStatus(dagStatus);
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              // continue;
+            }
+            dagStatus = dagClient.getDAGStatus();
+          } catch (TezException e) {
+            LOG.fatal("Failed to get application progress. Exiting");
+            System.exit(-1);
+          }
+        }
+        ExampleDriver.printMRRDAGStatus(dagStatus);
+        LOG.info("DAG " + dagIndex + " completed. "
+            + "FinalState=" + dagStatus.getState());
       }
     } finally {
       fs.delete(stagingDir, true);
@@ -367,9 +416,11 @@ public class OrderedWordCount {
       }
     }
 
-    ExampleDriver.printMRRDAGStatus(dagStatus);
-    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+    if (!useTezSession) {
+      ExampleDriver.printMRRDAGStatus(dagStatus);
+      LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+      System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index 2b9a35f..274d150 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -78,6 +78,7 @@ public class TestMRRJobs {
   @BeforeClass
   public static void setup() throws IOException {
     try {
+      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
       conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
       dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
         .format(true).racks(null).build();
@@ -109,11 +110,11 @@ public class TestMRRJobs {
     LOG.info("APP_JAR_HDFS: " + APP_JAR_HDFS);
     LOG.info("YARN_SITE_XML: " + YARN_SITE_XML);
     LOG.info("YARN_SITE_XML_HDFS: " + YARN_SITE_XML_HDFS);
-    
+
     localFs.copyFromLocalFile(new Path(MiniMRRTezCluster.APPJAR), APP_JAR);
     localFs.setPermission(APP_JAR, new FsPermission("700"));
     localFs.copyFromLocalFile(mrrTezCluster.getConfigFilePath(), YARN_SITE_XML);
-    
+
     remoteFs.copyFromLocalFile(new Path(MiniMRRTezCluster.APPJAR), APP_JAR_HDFS);
     remoteFs.copyFromLocalFile(mrrTezCluster.getConfigFilePath(), YARN_SITE_XML_HDFS);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 7280a1f..1c66deb 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -202,13 +203,78 @@ public class TestMRRJobsDAGApi {
 
   // Submits a DAG to AM via RPC after AM has started
   @Test(timeout = 60000)
-  public void testMRRSleepJobPlanViaRPC() throws IOException,
+  public void testMRRSleepJobViaSession() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
     State finalState = testMRRSleepJobDagSubmitCore(true, false, false);
 
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
   }
 
+  // Submits a DAG to AM via RPC after AM has started
+  @Test(timeout = 120000)
+  public void testMultipleMRRSleepJobViaSession() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    Map<String, String> commonEnv = createCommonEnv();
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    TezConfiguration tezConf = new TezConfiguration(
+        mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    LocalResource appJarLr = createLocalResource(remoteFs,
+        remoteFs.makeQualified(APP_JAR_HDFS), LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION);
+    LocalResource yarnSiteLr = createLocalResource(remoteFs,
+        remoteFs.makeQualified(YARN_SITE_XML_HDFS), LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION);
+    Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+    commonLocalResources.put(APP_JAR.getName(), appJarLr);
+
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
+    amLocalResources.put("yarn-site.xml", yarnSiteLr);
+    amLocalResources.putAll(commonLocalResources);
+
+    AMConfiguration amConfig = new AMConfiguration(
+        "default", commonEnv, amLocalResources,
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    TezSession tezSession = new TezSession("testsession", tezSessionConfig);
+    tezSession.start();
+
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    ApplicationId appId = tezSession.getApplicationId();
+    tezSession.stop();
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(mrrTezCluster.getConfig());
+    yarnClient.start();
+
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState().equals(
+          YarnApplicationState.FINISHED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.FAILED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.KILLED)) {
+        break;
+      }
+    }
+
+    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+    Assert.assertEquals(YarnApplicationState.FINISHED,
+        appReport.getYarnApplicationState());
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+        appReport.getFinalApplicationStatus());
+  }
+
   // Submits a simple 5 stage sleep job using tez session. Then kills it.
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
@@ -231,7 +297,27 @@ public class TestMRRJobsDAGApi {
       boolean dagViaRPC,
       boolean killDagWhileRunning,
       boolean closeSessionBeforeSubmit) throws IOException,
-      InterruptedException, TezException, ClassNotFoundException, YarnException {
+      InterruptedException, TezException, ClassNotFoundException,
+      YarnException {
+    return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
+        closeSessionBeforeSubmit, null);
+  }
+
+  private Map<String, String> createCommonEnv() {
+    Map<String, String> commonEnv = new HashMap<String, String>();
+    Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(), ".");
+    Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(),
+        System.getProperty("java.class.path"));
+    return commonEnv;
+  }
+
+  public State testMRRSleepJobDagSubmitCore(
+      boolean dagViaRPC,
+      boolean killDagWhileRunning,
+      boolean closeSessionBeforeSubmit,
+      TezSession reUseTezSession) throws IOException,
+      InterruptedException, TezException, ClassNotFoundException,
+      YarnException {
     LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
 
     if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
@@ -325,11 +411,7 @@ public class TestMRRJobsDAGApi {
     Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
     commonLocalResources.put(APP_JAR.getName(), appJarLr);
 
-    Map<String, String> commonEnv = new HashMap<String, String>();
-    // TODO Use utility method post TEZ-205.
-    Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(), ".");
-    Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(),
-        System.getProperty("java.class.path"));
+    Map<String, String> commonEnv = createCommonEnv();
 
     // TODO Use utility method post TEZ-205.
     Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
@@ -368,7 +450,7 @@ public class TestMRRJobsDAGApi {
         OnFileSortedOutput.class.getName()), new InputDescriptor(
                 ShuffledMergedInputLegacy.class.getName())));
     Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
-        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
         OnFileSortedOutput.class.getName()), new InputDescriptor(
                 ShuffledMergedInputLegacy.class.getName())));
@@ -389,6 +471,7 @@ public class TestMRRJobsDAGApi {
     TezClient tezClient = new TezClient(tezConf);
     DAGClient dagClient = null;
     TezSession tezSession = null;
+    boolean reuseSession = reUseTezSession != null;
     TezSessionConfiguration tezSessionConfig;
     AMConfiguration amConfig = new AMConfiguration(
         "default", commonEnv, amLocalResources,
@@ -397,9 +480,13 @@ public class TestMRRJobsDAGApi {
       // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
       dagClient = tezClient.submitDAGApplication(dag, amConfig);
     } else {
-      tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
-      tezSession = new TezSession("testsession", tezSessionConfig);
-      tezSession.start();
+      if (reuseSession) {
+        tezSession = reUseTezSession;
+      } else {
+        tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+        tezSession = new TezSession("testsession", tezSessionConfig);
+        tezSession.start();
+      }
     }
 
     if (dagViaRPC && closeSessionBeforeSubmit) {
@@ -429,7 +516,7 @@ public class TestMRRJobsDAGApi {
                 + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
             Assert.assertEquals(YarnApplicationState.FINISHED,
                 appState);
-            Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+            Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
                 appReport.getFinalApplicationStatus());
             break;
           }
@@ -460,6 +547,9 @@ public class TestMRRJobsDAGApi {
       }
       dagStatus = dagClient.getDAGStatus();
     }
+    if (dagViaRPC && !reuseSession) {
+      tezSession.stop();
+    }
     return dagStatus.getState();
   }
 


Mime
View raw message