tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [2/2] git commit: TEZ-341. Allow DAG to be submitted to an AM after it has started. (bikas and hitesh via hitesh)
Date Wed, 21 Aug 2013 02:12:30 GMT
TEZ-341. Allow DAG to be submitted to an AM after it has started. (bikas and hitesh via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9640b6f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9640b6f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9640b6f4

Branch: refs/heads/master
Commit: 9640b6f40bbd832e3089ef8e27a1bae37452c26b
Parents: a890571
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Aug 20 19:11:49 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Aug 20 19:11:49 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezClient.java   | 622 +++++++------------
 .../org/apache/tez/client/TezClientUtils.java   | 562 +++++++++++++++++
 .../java/org/apache/tez/client/TezSession.java  |  55 ++
 .../main/java/org/apache/tez/dag/api/DAG.java   |  11 +-
 .../org/apache/tez/dag/api/TezConstants.java    |  29 +
 .../apache/tez/dag/api/client/DAGClient.java    |  20 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |   8 +-
 .../src/main/proto/DAGClientAMProtocol.proto    |  17 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  33 +-
 .../java/org/apache/tez/dag/app/AppContext.java |   4 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 155 +++--
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   6 +-
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   2 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  17 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   4 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   3 +-
 .../dag/app/speculate/DefaultSpeculator.java    |  16 +-
 .../speculate/LegacyTaskRuntimeEstimator.java   |   6 +-
 .../dag/app/speculate/StartEndTimesBase.java    |  12 +-
 .../app/speculate/TaskSpeculationPredicate.java |   2 +-
 .../tez/dag/history/HistoryEventHandler.java    |   4 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   6 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   4 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |   8 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |   5 +-
 .../mapreduce/examples/OrderedWordCount.java    | 309 +++++++--
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 147 ++++-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   1 +
 29 files changed, 1448 insertions(+), 622 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index d7e2d33..636fe3c 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -18,76 +18,42 @@
 
 package org.apache.tez.client;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
 import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.Level;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+import com.google.protobuf.ServiceException;
 
 public class TezClient {
   private static final Log LOG = LogFactory.getLog(TezClient.class);
 
-  final public static FsPermission TEZ_AM_DIR_PERMISSION =
-      FsPermission.createImmutable((short) 0700); // rwx--------
-  final public static FsPermission TEZ_AM_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  public static final int UTF8_CHUNK_SIZE = 16 * 1024;
-
   private final TezConfiguration conf;
   private YarnClient yarnClient;
+  Map<String, LocalResource> tezJarResources = null;
 
   /**
    * <p>
@@ -112,7 +78,7 @@ public class TezClient {
 
   /**
    * Submit a Tez DAG to YARN as an application. The job will be submitted to
-   * the yarn cluster or tez service which was specified when creating this
+   * the yarn cluster which was specified when creating this
    * {@link TezClient} instance.
    *
    * @param dag
@@ -144,7 +110,45 @@ public class TezClient {
       TezConfiguration amConf) throws IOException, TezException {
     ApplicationId appId = createApplication();
     return submitDAGApplication(appId, dag, appStagingDir, ts, amQueueName,
-        amArgs, amEnv, amLocalResources, amConf);
+        dag.getName(), amArgs, amEnv, amLocalResources, amConf);
+  }
+
+  /**
+   * Submit a Tez DAG to YARN as an application. The job will be submitted to
+   * the yarn cluster which was specified when creating this
+   * {@link TezClient} instance. The AM will wait for the <code>DAG</code> to
+   * be submitted via RPC.
+   *
+   * @param amName
+   *          Name of the application
+   * @param appStagingDir
+   *          FileSystem path in which resources will be copied
+   * @param ts
+   *          Application credentials
+   * @param amQueueName
+   *          Queue to which the application will be submitted
+   * @param amArgs
+   *          Command line Java arguments for the ApplicationMaster
+   * @param amEnv
+   *          Environment to be added to the ApplicationMaster
+   * @param amLocalResources
+   *          YARN local resource for the ApplicationMaster
+   * @param conf
+   *          Configuration for the Tez DAG AM. tez configuration keys from this
+   *          config will be used when running the AM. Look at
+   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
+   *          parameters need to be changed.
+   * @return <code>ApplicationId</code> of the submitted Tez application
+   * @throws IOException
+   * @throws TezException
+   */
+  public DAGClient submitDAGApplication(String amName, Path appStagingDir,
+      Credentials ts, String amQueueName, List<String> amArgs,
+      Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
+      TezConfiguration amConf) throws IOException, TezException {
+    ApplicationId appId = createApplication();
+    return submitDAGApplication(appId, null, appStagingDir, ts, amQueueName,
+        amName, amArgs, amEnv, amLocalResources, amConf);
   }
 
   /**
@@ -179,14 +183,15 @@ public class TezClient {
    */
   @Private
   public DAGClient submitDAGApplication(ApplicationId appId, DAG dag,
-      Path appStagingDir, Credentials ts, String amQueueName,
+      Path appStagingDir, Credentials ts, String amQueueName, String amName,
       List<String> amArgs, Map<String, String> amEnv,
       Map<String, LocalResource> amLocalResources, TezConfiguration amConf)
       throws IOException, TezException {
     try {
-      ApplicationSubmissionContext appContext = createApplicationSubmissionContext(
-          appId, dag, appStagingDir, ts, amQueueName, dag.getName(), amArgs,
-          amEnv, amLocalResources, amConf);
+      ApplicationSubmissionContext appContext =
+          TezClientUtils.createApplicationSubmissionContext(
+          conf, appId, dag, appStagingDir, ts, amQueueName, amName, amArgs,
+          amEnv, amLocalResources, amConf, getTezJarResources());
       yarnClient.submitApplication(appContext);
     } catch (YarnException e) {
       throw new TezException(e);
@@ -210,395 +215,202 @@ public class TezClient {
     }
   }
 
+  private synchronized Map<String, LocalResource> getTezJarResources()
+      throws IOException {
+    if (tezJarResources == null) {
+      tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf);
+    }
+    return tezJarResources;
+  }
+
   @Private
   public DAGClient getDAGClient(ApplicationId appId)
       throws IOException, TezException {
-      return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), conf);
+      return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId),
+                                   conf);
   }
 
-  private void addLog4jSystemProperties(String logLevel, List<String> vargs) {
-    vargs.add("-Dlog4j.configuration="
-        + TezConfiguration.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
-    vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "="
-        + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
-    vargs.add("-D" + TezConfiguration.TEZ_ROOT_LOGGER_NAME + "=" + logLevel
-        + "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
-  }
-
-  public FileSystem ensureExists(Path stagingArea)
-      throws IOException {
-    FileSystem fs = stagingArea.getFileSystem(conf);
-    String realUser;
-    String currentUser;
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    realUser = ugi.getShortUserName();
-    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-    if (fs.exists(stagingArea)) {
-      FileStatus fsStatus = fs.getFileStatus(stagingArea);
-      String owner = fsStatus.getOwner();
-      if (!(owner.equals(currentUser) || owner.equals(realUser))) {
-        throw new IOException("The ownership on the staging directory "
-            + stagingArea + " is not as expected. " + "It is owned by " + owner
-            + ". The directory must " + "be owned by the submitter "
-            + currentUser + " or " + "by " + realUser);
-      }
-      if (!fsStatus.getPermission().equals(TEZ_AM_DIR_PERMISSION)) {
-        LOG.info("Permissions on staging directory " + stagingArea + " are "
-            + "incorrect: " + fsStatus.getPermission()
-            + ". Fixing permissions " + "to correct value "
-            + TEZ_AM_DIR_PERMISSION);
-        fs.setPermission(stagingArea, TEZ_AM_DIR_PERMISSION);
-      }
-    } else {
-      fs.mkdirs(stagingArea, new FsPermission(TEZ_AM_DIR_PERMISSION));
-    }
-    return fs;
+  // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
+  private static final char SEPARATOR = '_';
+  private static final String DAG = "dag";
+  private static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
   }
 
-  private LocalResource createLocalResource(FileSystem fs, Path p,
-      LocalResourceType type) throws IOException {
-    LocalResource rsrc = Records.newRecord(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.resolvePath(rsrcStat
-        .getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
+  String getDefaultTezDAGID(ApplicationId appId) {
+     return (new StringBuilder(DAG)).append(SEPARATOR).
+                   append(appId.getClusterTimestamp()).
+                   append(SEPARATOR).
+                   append(appId.getId()).
+                   append(SEPARATOR).
+                   append(idFormat.format(1)).toString();
   }
 
-  private Map<String, LocalResource> setupTezJarsLocalResources()
-      throws IOException {
-    Map<String, LocalResource> tezJarResources =
-        new TreeMap<String, LocalResource>();
-    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
-      return tezJarResources;
-    }
-
-    // Add tez jars to local resource
-    String[] tezJarUris = conf.getStrings(
-        TezConfiguration.TEZ_LIB_URIS);
-    if (tezJarUris == null
-        || tezJarUris.length == 0) {
-      throw new TezUncheckedException("Invalid configuration of tez jars"
-          + ", " + TezConfiguration.TEZ_LIB_URIS
-          + " is not defined in the configurartion");
-    }
-
-    for (String tezJarUri : tezJarUris) {
-      URI uri;
-      try {
-        uri = new URI(tezJarUri.trim());
-      } catch (URISyntaxException e) {
-        String message = "Invalid URI defined in configuration for"
-            + " location of TEZ jars. providedURI=" + tezJarUri;
-        LOG.error(message);
-        throw new TezUncheckedException(message, e);
-      }
-      if (!uri.isAbsolute()) {
-        String message = "Non-absolute URI defined in configuration for"
-            + " location of TEZ jars. providedURI=" + tezJarUri;
-        LOG.error(message);
-        throw new TezUncheckedException(message);
-      }
-      Path p = new Path(uri);
-      FileSystem pathfs = p.getFileSystem(conf);
-      RemoteIterator<LocatedFileStatus> iter = pathfs.listFiles(p, false);
-      while (iter.hasNext()) {
-        LocatedFileStatus fStatus = iter.next();
-        String rsrcName = fStatus.getPath().getName();
-        // FIXME currently not checking for duplicates due to quirks
-        // in assembly generation
-        if (tezJarResources.containsKey(rsrcName)) {
-          String message = "Duplicate resource found"
-              + ", resourceName=" + rsrcName
-              + ", existingPath=" +
-              tezJarResources.get(rsrcName).getResource().toString()
-              + ", newPath=" + fStatus.getPath();
-          LOG.warn(message);
-          // throw new TezUncheckedException(message);
-        }
-        tezJarResources.put(rsrcName,
-            LocalResource.newInstance(
-                ConverterUtils.getYarnUrlFromPath(fStatus.getPath()),
-                LocalResourceType.FILE,
-                LocalResourceVisibility.PUBLIC,
-                fStatus.getLen(),
-                fStatus.getModificationTime()));
-      }
+  /**
+   * Create a Tez Session. This will launch a Tez AM on the cluster and
+   * the session then can be used to submit dags to this Tez AM.
+   * @param appId Application Id
+   * @param sessionName
+   *          Name of the Session
+   * @param appStagingDir
+   *          FileSystem path in which resources will be copied
+   * @param ts
+   *          Application credentials
+   * @param amQueueName
+   *          Queue to which the application will be submitted
+   * @param amArgs
+   *          Command line Java arguments for the ApplicationMaster
+   * @param amEnv
+   *          Environment to be added to the ApplicationMaster
+   * @param amLocalResources
+   *          YARN local resource for the ApplicationMaster
+   * @param conf
+   *          Configuration for the Tez DAG AM. tez configuration keys from this
+   *          config will be used when running the AM. Look at
+   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
+   *          parameters need to be changed.
+   * @return TezSession handle to submit subsequent jobs
+   * @throws IOException
+   * @throws TezException
+   */
+  public TezSession createSession(ApplicationId appId, String sessionName,
+      Path appStagingDir, Credentials ts, String amQueueName,
+      List<String> amArgs, Map<String, String> amEnv,
+      Map<String, LocalResource> amLocalResources,
+      TezConfiguration amConf) throws TezException, IOException {
+    if (appId == null) {
+      appId = createApplication();
     }
-    if (tezJarResources.isEmpty()) {
-      LOG.warn("No tez jars found in configured locations"
-          + ". Ignoring for now. Errors may occur");
+    TezSession tezSession = new TezSession(sessionName, appId);
+    LOG.info("Creating a TezSession"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + appId);
+    try {
+      ApplicationSubmissionContext appContext =
+          TezClientUtils.createApplicationSubmissionContext(
+          conf, appId, null, appStagingDir, ts, amQueueName, sessionName,
+          amArgs, amEnv, amLocalResources, amConf, getTezJarResources());
+      tezSession.setTezConfigurationLocalResource(
+          appContext.getAMContainerSpec().getLocalResources().get(
+              TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
+      yarnClient.submitApplication(appContext);
+    } catch (YarnException e) {
+      throw new TezException(e);
     }
-    return tezJarResources;
+    return tezSession;
   }
 
-  private Configuration createFinalTezConfForApp(TezConfiguration amConf) {
-    Configuration conf = new Configuration(false);
-    conf.setQuietMode(true);
-
-    assert amConf != null;
-    Iterator<Entry<String, String>> iter = amConf.iterator();
-    while (iter.hasNext()) {
-      Entry<String, String> entry = iter.next();
-      // Copy all tez config parameters.
-      if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
-        conf.set(entry.getKey(), entry.getValue());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding tez dag am parameter: " + entry.getKey()
-              + ", with value: " + entry.getValue());
-        }
-      }
-    }
-    return conf;
+  /**
+   * Create a Tez Session. This will launch a Tez AM on the cluster and
+   * the session then can be used to submit dags to this Tez AM.
+   * @param sessionName
+   *          Name of the Session
+   * @param appStagingDir
+   *          FileSystem path in which resources will be copied
+   * @param ts
+   *          Application credentials
+   * @param amQueueName
+   *          Queue to which the application will be submitted
+   * @param amArgs
+   *          Command line Java arguments for the ApplicationMaster
+   * @param amEnv
+   *          Environment to be added to the ApplicationMaster
+   * @param amLocalResources
+   *          YARN local resource for the ApplicationMaster
+   * @param conf
+   *          Configuration for the Tez DAG AM. tez configuration keys from this
+   *          config will be used when running the AM. Look at
+   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
+   *          parameters need to be changed.
+   * @return TezSession handle to submit subsequent jobs
+   * @throws IOException
+   * @throws TezException
+   */
+  public synchronized TezSession createSession(String sessionName,
+      Path appStagingDir, Credentials ts, String amQueueName,
+      List<String> amArgs, Map<String, String> amEnv, Map<String,
+      LocalResource> amLocalResources, TezConfiguration amConf)
+          throws TezException, IOException {
+    return createSession(null, sessionName, appStagingDir, ts, amQueueName,
+        amArgs, amEnv, amLocalResources, amConf);
   }
 
-  private ApplicationSubmissionContext createApplicationSubmissionContext(
-      ApplicationId appId, DAG dag, Path appStagingDir, Credentials ts,
-      String amQueueName, String amName, List<String> amArgs,
-      Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
-      TezConfiguration appConf) throws IOException, YarnException {
-
-    if (appConf == null) {
-      appConf = new TezConfiguration();
-    }
-
-    FileSystem fs = ensureExists(appStagingDir);
-
-    // Setup resource requirements
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(
-        conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
-            TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
-    capability.setVirtualCores(
-        conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
-            TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
-    LOG.debug("AppMaster capability = " + capability);
-
-    ByteBuffer securityTokens = null;
-    // Setup security tokens
-    if (ts != null) {
-      DataOutputBuffer dob = new DataOutputBuffer();
-      ts.writeTokenStorageToStream(dob);
-      securityTokens = ByteBuffer.wrap(dob.getData(), 0,
-          dob.getLength());
-    }
-
-    // Setup the command to run the AM
-    List<String> vargs = new ArrayList<String>(8);
-    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
-
-    String amLogLevel = conf.get(TezConfiguration.TEZ_AM_LOG_LEVEL,
-                                 TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
-    addLog4jSystemProperties(amLogLevel, vargs);
-
-    if (amArgs != null) {
-      vargs.addAll(amArgs);
-    }
-
-    vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
-        File.separator + ApplicationConstants.STDOUT);
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
-        File.separator + ApplicationConstants.STDERR);
-
-
-    Vector<String> vargsFinal = new Vector<String>(8);
-    // Final command
-    StringBuilder mergedCommand = new StringBuilder();
-    for (CharSequence str : vargs) {
-      mergedCommand.append(str).append(" ");
-    }
-    vargsFinal.add(mergedCommand.toString());
-
-    LOG.debug("Command to launch container for ApplicationMaster is : "
-        + mergedCommand);
-
-    // Setup the CLASSPATH in environment
-    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
-    Map<String, String> environment = new HashMap<String, String>();
-
-    boolean isMiniCluster =
-        conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
-    if (isMiniCluster) {
-      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
-          System.getProperty("java.class.path"));
-    }
-
-    Apps.addToEnvironment(environment,
-        Environment.CLASSPATH.name(),
-        Environment.PWD.$());
-
-    Apps.addToEnvironment(environment,
-        Environment.CLASSPATH.name(),
-        Environment.PWD.$() + File.separator + "*");
-
-    // Add YARN/COMMON/HDFS jars to path
-    if (!isMiniCluster) {
-      for (String c : conf.getStrings(
-          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
-            c.trim());
-      }
-    }
-
-    if (amEnv != null) {
-      for (Map.Entry<String, String> entry : amEnv.entrySet()) {
-        Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
-      }
-    }
-
-    Map<String, LocalResource> localResources =
-        new TreeMap<String, LocalResource>();
-
-    if (amLocalResources != null) {
-      localResources.putAll(amLocalResources);
-    }
-
-    Map<String, LocalResource> tezJarResources =
-        setupTezJarsLocalResources();
-    localResources.putAll(tezJarResources);
+  private DAGClientAMProtocolBlockingPB getAMProxy(
+      ApplicationId applicationId) throws TezException, IOException {
+    return TezClientUtils.getAMProxy(yarnClient, conf, applicationId);
+  }
 
+  public synchronized DAGClient submitDAG(TezSession tezSession, DAG dag)
+      throws IOException, TezException {
+    String dagId = null;
+    LOG.info("Submitting dag to TezSession"
+        + ", sessionName=" + tezSession.getSessionName()
+        + ", applicationId=" + tezSession.getApplicationId());
     // Add tez jars to vertices too
     for (Vertex v : dag.getVertices()) {
-      v.getTaskLocalResources().putAll(tezJarResources);
+      v.getTaskLocalResources().putAll(getTezJarResources());
+      if (null != tezSession.getTezConfigurationLocalResource()) {
+        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+            tezSession.getTezConfigurationLocalResource());
+      }
     }
-
-    // emit conf as PB file
-    Configuration finalTezConf = createFinalTezConfForApp(appConf);
-    Path binaryConfPath =  new Path(appStagingDir,
-        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
-    FSDataOutputStream amConfPBOutBinaryStream = null;
-    try {
-      ConfigurationProto.Builder confProtoBuilder = 
-          ConfigurationProto.newBuilder();
-      Iterator<Entry<String, String>> iter = finalTezConf.iterator();
-      while (iter.hasNext()) {
-        Entry<String, String> entry = iter.next();
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        confProtoBuilder.addConfKeyValues(kvp);
+    DAGPlan dagPlan = dag.createDag(null);
+    SubmitDAGRequestProto requestProto =
+        SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
+
+    DAGClientAMProtocolBlockingPB proxy;
+    while (true) {
+      proxy = getAMProxy(tezSession.getApplicationId());
+      if (proxy != null) {
+        break;
       }
-      //binary output
-      amConfPBOutBinaryStream = FileSystem.create(fs, binaryConfPath,
-          new FsPermission(TEZ_AM_FILE_PERMISSION));
-      confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);      
-    } finally {
-      if(amConfPBOutBinaryStream != null){
-        amConfPBOutBinaryStream.close();
+      try {
+        Thread.sleep(100l);
+      } catch (InterruptedException e) {
+        // Ignore
       }
     }
-    LocalResource binaryConfLr = createLocalResource(fs,
-        binaryConfPath, LocalResourceType.FILE);
-    localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME, binaryConfLr);
-    
-    // Add tez conf to vertices too
-    for (Vertex v : dag.getVertices()) {
-      v.getTaskLocalResources().put(
-          TezConfiguration.TEZ_PB_BINARY_CONF_NAME, binaryConfLr);
-    }
-    
-    DAGPlan dagPB = dag.createDag(null);
-    // emit protobuf DAG file style
-    Path binaryDAGPath =  new Path(appStagingDir,
-        TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
-    FSDataOutputStream dagPBOutBinaryStream = null;
 
     try {
-      //binary output
-      dagPBOutBinaryStream = FileSystem.create(fs, binaryDAGPath,
-          new FsPermission(TEZ_AM_FILE_PERMISSION));
-      dagPB.writeTo(dagPBOutBinaryStream);
-    } finally {
-      if(dagPBOutBinaryStream != null){
-        dagPBOutBinaryStream.close();
-      }
-    }    
-
-    localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
-        createLocalResource(fs, binaryDAGPath, LocalResourceType.FILE));
-
-    if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
-      Path textPath = localizeDagPlanAsText(dagPB, fs, appStagingDir, appId);
-      localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
-          createLocalResource(fs, textPath, LocalResourceType.FILE));
+      dagId = proxy.submitDAG(null, requestProto).getDagId();
+    } catch (ServiceException e) {
+      throw new TezException(e);
     }
+    LOG.info("Submitted dag to TezSession"
+        + ", sessionName=" + tezSession.getSessionName()
+        + ", applicationId=" + tezSession.getApplicationId()
+        + ", dagId=" + dagId);
 
-    Map<ApplicationAccessType, String> acls
-        = new HashMap<ApplicationAccessType, String>();
-
-    // Setup ContainerLaunchContext for AM container
-    ContainerLaunchContext amContainer =
-        ContainerLaunchContext.newInstance(localResources, environment,
-            vargsFinal, null, securityTokens, acls);
-
-    // Set up the ApplicationSubmissionContext
-    ApplicationSubmissionContext appContext = Records
-        .newRecord(ApplicationSubmissionContext.class);
-
-    appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
-    appContext.setApplicationId(appId);
-    appContext.setResource(capability);
-    appContext.setQueue(amQueueName);
-    appContext.setApplicationName(amName);
-    appContext.setCancelTokensWhenComplete(conf.getBoolean(
-        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
-        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
-    appContext.setAMContainerSpec(amContainer);
-
-    return appContext;
+    return new DAGClientRPCImpl(tezSession.getApplicationId(), dagId, conf);
   }
 
-  private Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs,
-      Path appStagingDir, ApplicationId appId) throws IOException {
-    Path textPath = new Path(appStagingDir,
-        TezConfiguration.TEZ_PB_PLAN_TEXT_NAME + "." + appId.toString());
-    FSDataOutputStream dagPBOutTextStream = null;
-    try {
-      dagPBOutTextStream = FileSystem.create(fs, textPath, new FsPermission(
-          TEZ_AM_FILE_PERMISSION));
-      String dagPBStr = dagPB.toString();
-      int dagPBStrLen = dagPBStr.length();
-      if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
-        dagPBOutTextStream.writeUTF(dagPBStr);
-      } else {
-        int startIndex = 0;
-        while (startIndex < dagPBStrLen) {
-          int endIndex = startIndex + UTF8_CHUNK_SIZE;
-          if (endIndex > dagPBStrLen) {
-            endIndex = dagPBStrLen;
-          }
-          dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex));
-          startIndex += UTF8_CHUNK_SIZE;
-        }
-      }
-    } finally {
-      if (dagPBOutTextStream != null) {
-        dagPBOutTextStream.close();
+  public synchronized void closeSession(TezSession tezSession)
+      throws TezException, IOException {
+    LOG.info("Closing down Tez Session"
+        + ", sessionName=" + tezSession.getSessionName()
+        + ", applicationId=" + tezSession.getApplicationId());
+    DAGClientAMProtocolBlockingPB proxy = getAMProxy(
+        tezSession.getApplicationId());
+    if (proxy != null) {
+      try {
+        ShutdownSessionRequestProto request =
+            ShutdownSessionRequestProto.newBuilder().build();
+        proxy.shutdownSession(null, request);
+        return;
+      } catch (ServiceException e) {
+        LOG.info("Failed to shutdown Tez Session via proxy", e);
       }
     }
-    return textPath;
-  }
-
-  // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
-  private static final char SEPARATOR = '_';
-  private static final String DAG = "dag";
-  private static final NumberFormat idFormat = NumberFormat.getInstance();
-  static {
-    idFormat.setGroupingUsed(false);
-    idFormat.setMinimumIntegerDigits(6);
+    LOG.info("Could not connect to AM, killing session via YARN"
+        + ", sessionName=" + tezSession.getSessionName()
+        + ", applicationId=" + tezSession.getApplicationId());
+    try {
+      yarnClient.killApplication(tezSession.getApplicationId());
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
   }
 
-  String getDefaultTezDAGID(ApplicationId appId) {
-     return (new StringBuilder(DAG)).append(SEPARATOR).
-                   append(appId.getClusterTimestamp()).
-                   append(SEPARATOR).
-                   append(appId.getId()).
-                   append(SEPARATOR).
-                   append(idFormat.format(1)).toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
new file mode 100644
index 0000000..4dc5baf
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -0,0 +1,562 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class TezClientUtils {
+
+  private static Log LOG = LogFactory.getLog(TezClientUtils.class);
+
+  public static final FsPermission TEZ_AM_DIR_PERMISSION =
+      FsPermission.createImmutable((short) 0700); // rwx--------
+  public static final FsPermission TEZ_AM_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  private static final int UTF8_CHUNK_SIZE = 16 * 1024;
+
+  /**
+   * Setup LocalResource map for Tez jars based on provided Configuration
+   * @param conf Configuration to use to access Tez jars' locations
+   * @return Map of LocalResources to use when launching Tez AM
+   * @throws IOException
+   */
+  static Map<String, LocalResource> setupTezJarsLocalResources(
+      Configuration conf)
+      throws IOException {
+    Map<String, LocalResource> tezJarResources =
+        new TreeMap<String, LocalResource>();
+    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      return tezJarResources;
+    }
+
+    // Add tez jars to local resource
+    String[] tezJarUris = conf.getStrings(
+        TezConfiguration.TEZ_LIB_URIS);
+    if (tezJarUris == null
+        || tezJarUris.length == 0) {
+      throw new TezUncheckedException("Invalid configuration of tez jars"
+          + ", " + TezConfiguration.TEZ_LIB_URIS
+          + " is not defined in the configurartion");
+    }
+
+    for (String tezJarUri : tezJarUris) {
+      URI uri;
+      try {
+        uri = new URI(tezJarUri.trim());
+      } catch (URISyntaxException e) {
+        String message = "Invalid URI defined in configuration for"
+            + " location of TEZ jars. providedURI=" + tezJarUri;
+        LOG.error(message);
+        throw new TezUncheckedException(message, e);
+      }
+      if (!uri.isAbsolute()) {
+        String message = "Non-absolute URI defined in configuration for"
+            + " location of TEZ jars. providedURI=" + tezJarUri;
+        LOG.error(message);
+        throw new TezUncheckedException(message);
+      }
+      Path p = new Path(uri);
+      FileSystem pathfs = p.getFileSystem(conf);
+      RemoteIterator<LocatedFileStatus> iter = pathfs.listFiles(p, false);
+      while (iter.hasNext()) {
+        LocatedFileStatus fStatus = iter.next();
+        String rsrcName = fStatus.getPath().getName();
+        // FIXME currently not checking for duplicates due to quirks
+        // in assembly generation
+        if (tezJarResources.containsKey(rsrcName)) {
+          String message = "Duplicate resource found"
+              + ", resourceName=" + rsrcName
+              + ", existingPath=" +
+              tezJarResources.get(rsrcName).getResource().toString()
+              + ", newPath=" + fStatus.getPath();
+          LOG.warn(message);
+          // throw new TezUncheckedException(message);
+        }
+        tezJarResources.put(rsrcName,
+            LocalResource.newInstance(
+                ConverterUtils.getYarnUrlFromPath(fStatus.getPath()),
+                LocalResourceType.FILE,
+                LocalResourceVisibility.PUBLIC,
+                fStatus.getLen(),
+                fStatus.getModificationTime()));
+      }
+    }
+    if (tezJarResources.isEmpty()) {
+      LOG.warn("No tez jars found in configured locations"
+          + ". Ignoring for now. Errors may occur");
+    }
+    return tezJarResources;
+  }
+
+  /**
+   * Verify or create the Staging area directory on the configured Filesystem
+   * @param stagingArea Staging area directory path
+   * @return
+   * @throws IOException
+   */
+  public static FileSystem ensureStagingDirExists(Configuration conf,
+      Path stagingArea)
+      throws IOException {
+    FileSystem fs = stagingArea.getFileSystem(conf);
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (fs.exists(stagingArea)) {
+      FileStatus fsStatus = fs.getFileStatus(stagingArea);
+      String owner = fsStatus.getOwner();
+      if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+        throw new IOException("The ownership on the staging directory "
+            + stagingArea + " is not as expected. " + "It is owned by " + owner
+            + ". The directory must " + "be owned by the submitter "
+            + currentUser + " or " + "by " + realUser);
+      }
+      if (!fsStatus.getPermission().equals(TEZ_AM_DIR_PERMISSION)) {
+        LOG.info("Permissions on staging directory " + stagingArea + " are "
+            + "incorrect: " + fsStatus.getPermission()
+            + ". Fixing permissions " + "to correct value "
+            + TEZ_AM_DIR_PERMISSION);
+        fs.setPermission(stagingArea, TEZ_AM_DIR_PERMISSION);
+      }
+    } else {
+      fs.mkdirs(stagingArea, new FsPermission(TEZ_AM_DIR_PERMISSION));
+    }
+    return fs;
+  }
+
+  /**
+   * Create an ApplicationSubmissionContext to launch a Tez AM
+   * @param conf
+   * @param appId
+   * @param dag
+   * @param appStagingDir
+   * @param ts
+   * @param amQueueName
+   * @param amName
+   * @param amArgs
+   * @param amEnv
+   * @param amLocalResources
+   * @param appConf
+   * @return
+   * @throws IOException
+   * @throws YarnException
+   */
+  static ApplicationSubmissionContext createApplicationSubmissionContext(
+      Configuration conf, ApplicationId appId, DAG dag, Path appStagingDir,
+      Credentials ts, String amQueueName, String amName, List<String> amArgs,
+      Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
+      TezConfiguration appConf,
+      Map<String, LocalResource> tezJarResources)
+          throws IOException, YarnException {
+
+    if (appConf == null) {
+      appConf = new TezConfiguration();
+    }
+
+    FileSystem fs = TezClientUtils.ensureStagingDirExists(conf, appStagingDir);
+
+    // Setup resource requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(
+        conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+            TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
+    capability.setVirtualCores(
+        conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
+            TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
+    LOG.debug("AppMaster capability = " + capability);
+
+    ByteBuffer securityTokens = null;
+    // Setup security tokens
+    if (ts != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      ts.writeTokenStorageToStream(dob);
+      securityTokens = ByteBuffer.wrap(dob.getData(), 0,
+          dob.getLength());
+    }
+
+    // Setup the command to run the AM
+    List<String> vargs = new ArrayList<String>(8);
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    String amLogLevel = conf.get(TezConfiguration.TEZ_AM_LOG_LEVEL,
+                                 TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
+    addLog4jSystemProperties(amLogLevel, vargs);
+
+    if (amArgs != null) {
+      vargs.addAll(amArgs);
+    }
+
+    vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        File.separator + ApplicationConstants.STDOUT);
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        File.separator + ApplicationConstants.STDERR);
+
+
+    Vector<String> vargsFinal = new Vector<String>(8);
+    // Final command
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    vargsFinal.add(mergedCommand.toString());
+
+    LOG.debug("Command to launch container for ApplicationMaster is : "
+        + mergedCommand);
+
+    // Setup the CLASSPATH in environment
+    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+
+    boolean isMiniCluster =
+        conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
+    if (isMiniCluster) {
+      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          System.getProperty("java.class.path"));
+    }
+
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$());
+
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$() + File.separator + "*");
+
+    // Add YARN/COMMON/HDFS jars to path
+    if (!isMiniCluster) {
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+            c.trim());
+      }
+    }
+
+    if (amEnv != null) {
+      for (Map.Entry<String, String> entry : amEnv.entrySet()) {
+        Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+      }
+    }
+
+    Map<String, LocalResource> localResources =
+        new TreeMap<String, LocalResource>();
+
+    if (amLocalResources != null) {
+      localResources.putAll(amLocalResources);
+    }
+    localResources.putAll(tezJarResources);
+
+    // emit conf as PB file
+    Configuration finalTezConf = createFinalTezConfForApp(appConf);
+    Path binaryConfPath =  new Path(appStagingDir,
+        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+    FSDataOutputStream amConfPBOutBinaryStream = null;
+    try {
+      ConfigurationProto.Builder confProtoBuilder =
+          ConfigurationProto.newBuilder();
+      Iterator<Entry<String, String>> iter = finalTezConf.iterator();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+      //binary output
+      amConfPBOutBinaryStream = FileSystem.create(fs, binaryConfPath,
+          new FsPermission(TEZ_AM_FILE_PERMISSION));
+      confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);
+    } finally {
+      if(amConfPBOutBinaryStream != null){
+        amConfPBOutBinaryStream.close();
+      }
+    }
+
+    LocalResource binaryConfLRsrc =
+        TezClientUtils.createLocalResource(fs,
+            binaryConfPath, LocalResourceType.FILE,
+            LocalResourceVisibility.APPLICATION);
+    localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+        binaryConfLRsrc);
+
+    if(dag != null) {
+      // Add tez jars to vertices too
+      for (Vertex v : dag.getVertices()) {
+        v.getTaskLocalResources().putAll(tezJarResources);
+        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+            binaryConfLRsrc);
+      }
+
+      // emit protobuf DAG file style
+      Path binaryPath =  new Path(appStagingDir,
+          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
+      appConf.set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH, binaryPath.toUri()
+          .toString());
+
+      DAGPlan dagPB = dag.createDag(null);
+
+      FSDataOutputStream dagPBOutBinaryStream = null;
+
+      try {
+        //binary output
+        dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+            new FsPermission(TEZ_AM_FILE_PERMISSION));
+        dagPB.writeTo(dagPBOutBinaryStream);
+      } finally {
+        if(dagPBOutBinaryStream != null){
+          dagPBOutBinaryStream.close();
+        }
+      }
+
+      localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
+          TezClientUtils.createLocalResource(fs,
+              binaryPath, LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION));
+
+      if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
+        Path textPath = localizeDagPlanAsText(dagPB, fs, appStagingDir, appId);
+        localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
+            TezClientUtils.createLocalResource(fs,
+                textPath, LocalResourceType.FILE,
+                LocalResourceVisibility.APPLICATION));
+      }
+    } else {
+      Apps.addToEnvironment(environment,
+          TezConstants.TEZ_AM_IS_SESSION_ENV, "set");
+    }
+
+    Map<ApplicationAccessType, String> acls
+        = new HashMap<ApplicationAccessType, String>();
+
+    // Setup ContainerLaunchContext for AM container
+    ContainerLaunchContext amContainer =
+        ContainerLaunchContext.newInstance(localResources, environment,
+            vargsFinal, null, securityTokens, acls);
+
+    // Set up the ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+
+    appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
+    appContext.setApplicationId(appId);
+    appContext.setResource(capability);
+    appContext.setQueue(amQueueName);
+    appContext.setApplicationName(amName);
+    appContext.setCancelTokensWhenComplete(conf.getBoolean(
+        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
+        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
+    appContext.setAMContainerSpec(amContainer);
+
+    return appContext;
+  }
+
+  @VisibleForTesting
+  static void addLog4jSystemProperties(String logLevel,
+      List<String> vargs) {
+    vargs.add("-Dlog4j.configuration="
+        + TezConfiguration.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
+    vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "="
+        + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+    vargs.add("-D" + TezConfiguration.TEZ_ROOT_LOGGER_NAME + "=" + logLevel
+        + "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
+  }
+
+  static Configuration createFinalTezConfForApp(TezConfiguration amConf) {
+    Configuration conf = new Configuration(false);
+    conf.setQuietMode(true);
+
+    assert amConf != null;
+    Iterator<Entry<String, String>> iter = amConf.iterator();
+    while (iter.hasNext()) {
+      Entry<String, String> entry = iter.next();
+      // Copy all tez config parameters.
+      if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
+        conf.set(entry.getKey(), entry.getValue());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding tez dag am parameter: " + entry.getKey()
+              + ", with value: " + entry.getValue());
+        }
+      }
+    }
+    return conf;
+  }
+
+  /**
+   * Helper function to create a YARN LocalResource
+   * @param fs FileSystem object
+   * @param p Path of resource to localize
+   * @param type LocalResource Type
+   * @return
+   * @throws IOException
+   */
+  static LocalResource createLocalResource(FileSystem fs, Path p,
+      LocalResourceType type,
+      LocalResourceVisibility visibility) throws IOException {
+    LocalResource rsrc = Records.newRecord(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.resolvePath(rsrcStat
+        .getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(visibility);
+    return rsrc;
+  }
+
+  private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs,
+      Path appStagingDir, ApplicationId appId) throws IOException {
+    Path textPath = new Path(appStagingDir,
+        TezConfiguration.TEZ_PB_PLAN_TEXT_NAME + "." + appId.toString());
+    FSDataOutputStream dagPBOutTextStream = null;
+    try {
+      dagPBOutTextStream = FileSystem.create(fs, textPath, new FsPermission(
+          TEZ_AM_FILE_PERMISSION));
+      String dagPBStr = dagPB.toString();
+      int dagPBStrLen = dagPBStr.length();
+      if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
+        dagPBOutTextStream.writeUTF(dagPBStr);
+      } else {
+        int startIndex = 0;
+        while (startIndex < dagPBStrLen) {
+          int endIndex = startIndex + UTF8_CHUNK_SIZE;
+          if (endIndex > dagPBStrLen) {
+            endIndex = dagPBStrLen;
+          }
+          dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex));
+          startIndex += UTF8_CHUNK_SIZE;
+        }
+      }
+    } finally {
+      if (dagPBOutTextStream != null) {
+        dagPBOutTextStream.close();
+      }
+    }
+    return textPath;
+  }
+
+  static DAGClientAMProtocolBlockingPB getAMProxy(YarnClient yarnClient,
+      Configuration conf,
+      ApplicationId applicationId) throws TezException, IOException {
+    ApplicationReport appReport;
+    try {
+      appReport = yarnClient.getApplicationReport(
+          applicationId);
+
+      if(appReport == null) {
+        throw new TezUncheckedException("Could not retrieve application report"
+            + " from YARN, applicationId=" + applicationId);
+      }
+      YarnApplicationState appState = appReport.getYarnApplicationState();
+      if(appState != YarnApplicationState.RUNNING) {
+        if (appState == YarnApplicationState.FINISHED
+            || appState == YarnApplicationState.KILLED
+            || appState == YarnApplicationState.FAILED) {
+          throw new TezUncheckedException("Application not running"
+              + ", applicationId=" + applicationId
+              + ", yarnApplicationState=" + appReport.getYarnApplicationState()
+              + ", finalApplicationStatus="
+              + appReport.getFinalApplicationStatus()
+              + ", trackingUrl=" + appReport.getTrackingUrl());
+        }
+        return null;
+      }
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort());
+  }
+
+  static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
+      String amHost, int amRpcPort) throws IOException {
+    InetSocketAddress addr = new InetSocketAddress(amHost,
+        amRpcPort);
+
+    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+        ProtobufRpcEngine.class);
+    DAGClientAMProtocolBlockingPB proxy =
+        (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+            DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+    return proxy;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
new file mode 100644
index 0000000..63e5d7d
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+public class TezSession {
+
+  private final String sessionName;
+  private final ApplicationId applicationId;
+  private LocalResource tezConfPBLRsrc = null;
+
+  public TezSession(String sessionName, ApplicationId applicationId) {
+    this.sessionName = sessionName;
+    this.applicationId = applicationId;
+  }
+
+  @Private
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public String getSessionName() {
+    return sessionName;
+  }
+
+  @Private
+  public LocalResource getTezConfigurationLocalResource() {
+    return tezConfPBLRsrc;
+  }
+
+  @Private
+  public void setTezConfigurationLocalResource(LocalResource tezConfPBLRsrc) {
+    this.tezConfPBLRsrc = tezConfPBLRsrc;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 576bdff..3958b95 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -102,10 +102,10 @@ public class DAG { // FIXME rename to Topology
 
     int index; //for Tarjan's algorithm
     int lowlink; //for Tarjan's algorithm
-    boolean onstack; //for Tarjan's algorithm 
-    
+    boolean onstack; //for Tarjan's algorithm
+
     int outDegree;
-    
+
     private AnnotatedVertex(Vertex v){
        this.v = v;
        index = -1;
@@ -169,7 +169,7 @@ public class DAG { // FIXME rename to Topology
     if(restricted){
       for(Edge e : edges){
         vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
-        if (e.getEdgeProperty().getConnectionPattern() != 
+        if (e.getEdgeProperty().getConnectionPattern() !=
             ConnectionPattern.BIPARTITE) {
           throw new IllegalStateException(
               "Unsupported connection pattern on edge. " + e);
@@ -347,7 +347,8 @@ public class DAG { // FIXME rename to Topology
 
     if(dagConf != null) {
       Iterator<Entry<String, String>> iter = dagConf.iterator();
-      ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+      ConfigurationProto.Builder confProtoBuilder =
+          ConfigurationProto.newBuilder();
       while (iter.hasNext()) {
         Entry<String, String> entry = iter.next();
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
new file mode 100644
index 0000000..5463d65
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Specifies all constant values in Tez
+ */
+public class TezConstants {
+
+  // Env variable names
+  public static final String TEZ_AM_IS_SESSION_ENV = "TEZ_AM_IS_SESSION";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 3f5d07a..9062e8e 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -31,37 +31,37 @@ import org.apache.tez.dag.api.TezException;
  * Application Master.
  */
 public interface DAGClient extends Closeable {
-  
+
   /**
    * Get the YARN ApplicationId for the app running the DAG
    * @return <code>ApplicationId</code>
    */
   public ApplicationId getApplicationId();
-  
+
   @Private
   /**
    * Get the YARN ApplicationReport for the app running the DAG. For performance
-   * reasons this may be stale copy and should be used to access static info. It 
+   * reasons this may be stale copy and should be used to access static info. It
    * may be null.
    * @return <code>ApplicationReport</code> or null
    */
   public ApplicationReport getApplicationReport();
-  
+
   /**
    * Get the status of the specified DAG
    */
   public DAGStatus getDAGStatus() throws IOException, TezException;
-  
+
   /**
-   * Get the status of a Vertex of a DAG 
+   * Get the status of a Vertex of a DAG
    */
   public VertexStatus getVertexStatus(String vertexName)
       throws IOException, TezException;
-  
+
   /**
    * Kill a running DAG
-   * 
+   *
    */
-  public void tryKillDAG()
-      throws TezException, IOException;
+  public void tryKillDAG() throws TezException, IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 5cf296b..dd68705 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -49,9 +49,9 @@ import com.google.protobuf.ServiceException;
 public class DAGClientRPCImpl implements DAGClient {
   private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
 
-  private ApplicationId appId;
-  private String dagId;
-  private TezConfiguration conf;
+  private final ApplicationId appId;
+  private final String dagId;
+  private final TezConfiguration conf;
   private ApplicationReport appReport;
   private YarnClient yarnClient;
   private DAGClientAMProtocolBlockingPB proxy = null;
@@ -101,6 +101,7 @@ public class DAGClientRPCImpl implements DAGClient {
     return null;
   }
 
+  @Override
   public void tryKillDAG() throws TezException, IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
@@ -285,4 +286,5 @@ public class DAGClientRPCImpl implements DAGClient {
         DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
     return true;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
index dd8f3d5..6fcd1f8 100644
--- a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
@@ -37,7 +37,7 @@ message GetDAGStatusRequestProto {
 }
 
 message GetDAGStatusResponseProto {
-  optional DAGStatusProto dagStatus = 1; 
+  optional DAGStatusProto dagStatus = 1;
 }
 
 message GetVertexStatusRequestProto {
@@ -57,10 +57,25 @@ message TryKillDAGResponseProto {
   //nothing yet
 }
 
+message SubmitDAGRequestProto {
+  optional DAGPlan d_a_g_plan = 1;
+}
+
+message SubmitDAGResponseProto {
+  optional string dagId = 1;
+}
+
+message ShutdownSessionRequestProto {
+}
+
+message ShutdownSessionResponseProto {
+}
 
 service DAGClientAMProtocol {
   rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
   rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
   rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
   rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
+  rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
+  rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 1d12ffe..1b6d562 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -32,8 +32,13 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequ
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGResponseProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 
 import com.google.protobuf.RpcController;
@@ -41,9 +46,9 @@ import com.google.protobuf.ServiceException;
 
 public class DAGClientAMProtocolBlockingPBServerImpl implements
     DAGClientAMProtocolBlockingPB {
-  
+
   DAGClientHandler real;
-  
+
   public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real) {
     this.real = real;
   }
@@ -90,7 +95,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
       throw wrapException(e);
     }
   }
-  
+
   @Override
   public TryKillDAGResponseProto tryKillDAG(RpcController controller,
       TryKillDAGRequestProto request) throws ServiceException {
@@ -102,10 +107,28 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
       throw wrapException(e);
     }
   }
-  
-  
+
+  @Override
+  public SubmitDAGResponseProto submitDAG(RpcController controller,
+      SubmitDAGRequestProto request) throws ServiceException {
+    try{
+      DAGPlan dagPlan = request.getDAGPlan();
+      String dagId = real.submitDAG(dagPlan);
+      return SubmitDAGResponseProto.newBuilder().setDagId(dagId).build();
+    } catch(TezException e) {
+      throw wrapException(e);
+    }
+  }
+
   ServiceException wrapException(Exception e){
     return new ServiceException(e);
   }
 
+  @Override
+  public ShutdownSessionResponseProto shutdownSession(RpcController arg0,
+      ShutdownSessionRequestProto arg1) throws ServiceException {
+    real.shutdownAM();
+    return ShutdownSessionResponseProto.newBuilder().build();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 5f25998..64f8965 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -46,7 +46,7 @@ public interface AppContext {
 
   ApplicationId getApplicationID();
 
-  TezDAGID getDAGID();
+  TezDAGID getCurrentDAGID();
 
   ApplicationAttemptId getApplicationAttemptId();
 
@@ -58,7 +58,7 @@ public interface AppContext {
 
   String getUser();
 
-  DAG getDAG();
+  DAG getCurrentDAG();
 
   void setDAG(DAG dag);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4697d38..56c89b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -25,10 +25,10 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -66,14 +66,15 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -172,7 +173,7 @@ public class DAGAppMaster extends AbstractService {
   DAGClientServer clientRpcServer;
   private DAGClientHandler clientHandler;
 
-  private DAG dag;
+  private DAG currentDAG;
   private Credentials fsTokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
 
@@ -180,7 +181,6 @@ public class DAGAppMaster extends AbstractService {
   Map<Service, ServiceWithDependency> services =
       new LinkedHashMap<Service, ServiceWithDependency>();
 
-
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       long appSubmitTime) {
@@ -296,10 +296,10 @@ public class DAGAppMaster extends AbstractService {
     switch (event.getType()) {
     case INTERNAL_ERROR:
       state = DAGAppMasterState.ERROR;
-      if(dag != null) {
+      if(currentDAG != null) {
         // notify dag to finish which will send the DAG_FINISHED event
         LOG.info("Internal Error. Notifying dags to finish.");
-        sendEvent(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
+        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
       } else {
         LOG.info("Internal Error. Finishing directly as no dag is active.");
         shutdownHandler.shutdown();
@@ -379,7 +379,6 @@ public class DAGAppMaster extends AbstractService {
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
             currentUser.getShortUserName(),
             taskHeartbeatHandler, context);
-    ((RunningAppContext) context).setDAG(newDag);
 
     return newDag;
   } // end createDag()
@@ -524,15 +523,15 @@ public class DAGAppMaster extends AbstractService {
   }
 
   public List<String> getDiagnostics() {
-    if(dag != null) {
-      return dag.getDiagnostics();
+    if(currentDAG != null) {
+      return currentDAG.getDiagnostics();
     }
     return null;
   }
 
   public float getProgress() {
-    if(dag != null && dag.getState() == DAGState.RUNNING) {
-      return dag.getProgress();
+    if(currentDAG != null && currentDAG.getState() == DAGState.RUNNING) {
+      return currentDAG.getProgress();
     }
     return 0;
   }
@@ -540,7 +539,7 @@ public class DAGAppMaster extends AbstractService {
   private synchronized void setStateOnDAGCompletion() {
     DAGAppMasterState oldState = state;
     if(state == DAGAppMasterState.RUNNING) {
-      switch(dag.getState()) {
+      switch(currentDAG.getState()) {
       case SUCCEEDED:
         state = DAGAppMasterState.SUCCEEDED;
         break;
@@ -565,7 +564,7 @@ public class DAGAppMaster extends AbstractService {
   public class DAGClientHandler {
 
     public List<String> getAllDAGs() throws TezException {
-      return Collections.singletonList(dag.getID().toString());
+      return Collections.singletonList(currentDAG.getID().toString());
     }
 
     public DAGStatus getDAGStatus(String dagIdStr)
@@ -588,14 +587,14 @@ public class DAGAppMaster extends AbstractService {
       if(dagId == null) {
         throw new TezException("Bad dagId: " + dagIdStr);
       }
-      if(dag == null) {
+      if(currentDAG == null) {
         throw new TezException("No running dag at present");
       }
-      if(!dagId.equals(dag.getID())) {
+      if(!dagId.equals(currentDAG.getID())) {
         throw new TezException("Unknown dagId: " + dagIdStr);
       }
 
-      return dag;
+      return currentDAG;
     }
 
     public void tryKillDAG(String dagIdStr)
@@ -605,6 +604,31 @@ public class DAGAppMaster extends AbstractService {
       //send a DAG_KILL message
       sendEvent(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
     }
+
+    public synchronized String submitDAG(DAGPlan dagPlan) throws TezException {
+      if(currentDAG != null) {
+        throw new TezException("App master already running a DAG");
+      }
+      // RPC server runs in the context of the job user as it was started in
+      // the job user's UGI context
+      LOG.info("Starting DAG submitted via RPC");
+      startDAG(dagPlan);
+      return currentDAG.getID().toString();
+    }
+
+    public synchronized void shutdownAM() {
+      LOG.info("Received message to shutdown AM");
+      if (currentDAG != null
+          && !currentDAG.isComplete()) {
+        //send a DAG_KILL message
+        LOG.info("Sending a kill event to the current DAG"
+            + ", dagId=" + currentDAG.getID());
+        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+      } else {
+        LOG.info("No current running DAG, shutting down the AM");
+        shutdownHandler.shutdown();
+      }
+    }
   }
 
   private class RunningAppContext implements AppContext {
@@ -650,7 +674,7 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
-    public DAG getDAG() {
+    public DAG getCurrentDAG() {
       try {
         rLock.lock();
         return dag;
@@ -704,7 +728,7 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
-    public TezDAGID getDAGID() {
+    public TezDAGID getCurrentDAGID() {
       try {
         rLock.lock();
         if(dag != null) {
@@ -927,7 +951,7 @@ public class DAGAppMaster extends AbstractService {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(DAGEvent event) {
-      ((EventHandler<DAGEvent>)context.getDAG()).handle(event);
+      ((EventHandler<DAGEvent>)context.getCurrentDAG()).handle(event);
     }
   }
 
@@ -936,7 +960,7 @@ public class DAGAppMaster extends AbstractService {
     @Override
     public void handle(TaskEvent event) {
       Task task =
-          context.getDAG().getVertex(event.getTaskID().getVertexID()).
+          context.getCurrentDAG().getVertex(event.getTaskID().getVertexID()).
               getTask(event.getTaskID());
       ((EventHandler<TaskEvent>)task).handle(event);
     }
@@ -947,7 +971,7 @@ public class DAGAppMaster extends AbstractService {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskAttemptEvent event) {
-      DAG dag = context.getDAG();
+      DAG dag = context.getCurrentDAG();
       Task task =
           dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
               getTask(event.getTaskAttemptID().getTaskID());
@@ -961,7 +985,7 @@ public class DAGAppMaster extends AbstractService {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(VertexEvent event) {
-      DAG dag = context.getDAG();
+      DAG dag = context.getCurrentDAG();
       org.apache.tez.dag.app.dag.Vertex vertex =
           dag.getVertex(event.getVertexId());
       ((EventHandler<VertexEvent>) vertex).handle(event);
@@ -1069,42 +1093,8 @@ public class DAGAppMaster extends AbstractService {
 
       dagPlan = dagPlanBuilder.build();
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Running a DAG with " + dagPlan.getVertexCount()
-            + " vertices ");
-        for (VertexPlan v : dagPlan.getVertexList()) {
-          LOG.debug("DAG has vertex " + v.getName());
-        }
-      }
-
-      Map<String, String> config = DagTypeConverters.
-          convertConfFromProto(dagPlan.getDagKeyValues());
-      if(config != null) {
-        for(Entry<String, String> entry : config.entrySet()) {
-          conf.set(entry.getKey(), entry.getValue());
-        }
-      }
+      startDAG(dagPlan);
 
-      // Job name is the same as the app name until we support multiple dags
-      // for an app later
-      appName = dagPlan.getName();
-
-      // /////////////////// Create the job itself.
-      dag = createDAG(dagPlan);
-      // End of creating the job.
-
-      // create a job event for job intialization
-      DAGEvent initDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_INIT);
-      // Send init to the job (this does NOT trigger job execution)
-      // This is a synchronous call, not an event through dispatcher. We want
-      // job-init to be done completely here.
-      dagEventDispatcher.handle(initDagEvent);
-
-      // All components have started, start the job.
-      /** create a job-start event to get this ball rolling */
-      DAGEvent startDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_START);
-      /** send the job-start event. this triggers the job execution. */
-      sendEvent(startDagEvent);
     } finally {
       if (dagPBBinaryStream != null) {
         dagPBBinaryStream.close();
@@ -1112,6 +1102,50 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  private void startDAG(DAGPlan dagPlan) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running a DAG with " + dagPlan.getVertexCount()
+          + " vertices ");
+      for (VertexPlan v : dagPlan.getVertexList()) {
+        LOG.debug("DAG has vertex " + v.getName());
+      }
+    }
+
+    Iterator<PlanKeyValuePair> iter =
+        dagPlan.getDagKeyValues().getConfKeyValuesList().iterator();
+    while (iter.hasNext()) {
+      PlanKeyValuePair keyValPair = iter.next();
+      conf.set(keyValPair.getKey(), keyValPair.getValue());
+    }
+
+    // Job name is the same as the app name until we support multiple dags
+    // for an app later
+    appName = dagPlan.getName();
+
+    // /////////////////// Create the job itself.
+    DAG newDAG = createDAG(dagPlan);
+    startDAG(newDAG);
+  }
+
+  private void startDAG(DAG dag) {
+    currentDAG = dag;
+    // End of creating the job.
+    ((RunningAppContext) context).setDAG(currentDAG);
+
+    // create a job event for job initialization
+    DAGEvent initDagEvent = new DAGEvent(currentDAG.getID(), DAGEventType.DAG_INIT);
+    // Send init to the job (this does NOT trigger job execution)
+    // This is a synchronous call, not an event through dispatcher. We want
+    // job-init to be done completely here.
+    dagEventDispatcher.handle(initDagEvent);
+
+    // All components have started, start the job.
+    /** create a job-start event to get this ball rolling */
+    DAGEvent startDagEvent = new DAGEvent(currentDAG.getID(), DAGEventType.DAG_START);
+    /** send the job-start event. this triggers the job execution. */
+    sendEvent(startDagEvent);
+  }
+
   // TODO XXX Does this really need to be a YarnConfiguration ?
   protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
       final Configuration conf, String jobUserName) throws IOException,
@@ -1127,7 +1161,12 @@ public class DAGAppMaster extends AbstractService {
       public Object run() throws Exception {
         appMaster.init(conf);
         appMaster.start();
-        appMaster.startDAG();
+        String submitDAGOverRpc = System.getenv(TezConstants.TEZ_AM_IS_SESSION_ENV);
+        if(submitDAGOverRpc == null || submitDAGOverRpc.isEmpty()) {
+          appMaster.startDAG();
+        } else {
+          LOG.info("Waiting for DAG over RPC");
+        }
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 42bde6a..619a494 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -177,7 +177,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // TODO: shouldReset is never used. See TT. Ask for Removal.
     boolean shouldReset = false;
     TezDependentTaskCompletionEvent[] events =
-        context.getDAG().
+        context.getCurrentDAG().
             getVertex(taskAttemptID.getTaskID().getVertexID()).
                 getTaskAttemptCompletionEvents(taskAttemptID, fromEventIdx, maxEvents);
 
@@ -420,7 +420,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     taskHeartbeatHandler.progressing(taskAttemptId);
     pingContainerHeartbeatHandler(taskAttemptId);
 
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
     Task task =
         job.getVertex(taskAttemptId.getTaskID().getVertexID()).
             getTask(taskAttemptId.getTaskID());
@@ -474,7 +474,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // between polls (MRTask) implies tasks end up wasting upto 1 second doing
     // nothing. Similarly for CA_COMMIT.
 
-    DAG job = context.getDAG();
+    DAG job = context.getCurrentDAG();
     Task task =
         job.getVertex(taskAttemptId.getTaskID().getVertexID()).
             getTask(taskAttemptId.getTaskID());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 9fd0115..455b583 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -65,6 +65,8 @@ public interface DAG {
   DAGStatusBuilder getDAGStatus();
   VertexStatusBuilder getVertexStatus(String vertexName);
 
+  boolean isComplete();
+
   /**
    * @return the ACLs for this job for each type of JobACL given.
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index b9c40fe..48c9c33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1225,4 +1225,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       job.finished(DAGState.ERROR);
     }
   }
+
+  @Override
+  public boolean isComplete() {
+    readLock.lock();
+    try {
+      DAGState state = getState();
+      if (state.equals(DAGState.SUCCEEDED)
+          || state.equals(DAGState.FAILED)
+          || state.equals(DAGState.KILLED)
+          || state.equals(DAGState.ERROR)) {
+        return true;
+      }
+      return false;
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 9db4a9e..05274da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -519,13 +519,13 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   @Override
   public Task getTask() {
-    return appContext.getDAG()
+    return appContext.getCurrentDAG()
         .getVertex(attemptId.getTaskID().getVertexID())
         .getTask(attemptId.getTaskID());
   }
   
   Vertex getVertex() {
-    return appContext.getDAG()
+    return appContext.getCurrentDAG()
         .getVertex(attemptId.getTaskID().getVertexID());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e83fe17..4fcb30d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -340,7 +340,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   @Override
   public Vertex getVertex() {
-    return appContext.getDAG().getVertex(taskId.getVertexID());
+    return appContext.getCurrentDAG().getVertex(taskId.getVertexID());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9640b6f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index df46765..1bd2e63 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1421,7 +1421,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public DAG getDAG() {
-    return appContext.getDAG();
+    return appContext.getCurrentDAG();
   }
 
   private TezDAGID getDAGId() {
@@ -1435,6 +1435,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public Resource getTaskResource() {
     return taskResource;
   }
+
   @VisibleForTesting
   String getProcessorName() {
     return this.processorDescriptor.getClassName();


Mime
View raw message