tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-349. Add Add initial support for Tez Sessions. (hitesh)
Date Fri, 23 Aug 2013 21:58:10 GMT
Updated Branches:
  refs/heads/master 3f8f5bead -> 6725cc590


TEZ-349. Add Add initial support for Tez Sessions. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6725cc59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6725cc59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6725cc59

Branch: refs/heads/master
Commit: 6725cc59010554f28dd7644ab61867d9d5ae5391
Parents: 3f8f5be
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Aug 23 14:56:58 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Aug 23 14:56:58 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/AMConfiguration.java  | 100 +++++++
 .../java/org/apache/tez/client/TezClient.java   | 300 +------------------
 .../org/apache/tez/client/TezClientUtils.java   |  66 ++--
 .../java/org/apache/tez/client/TezSession.java  | 155 +++++++++-
 .../tez/client/TezSessionConfiguration.java     |  57 ++++
 .../apache/tez/dag/api/TezConfiguration.java    |  20 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |   3 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  16 +-
 .../mapreduce/examples/OrderedWordCount.java    |  32 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  31 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |  23 +-
 11 files changed, 421 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/client/AMConfiguration.java
new file mode 100644
index 0000000..f452c74
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class AMConfiguration {
+
+  private final Path stagingDir;
+  private final String queueName;
+  private final Map<String, String> env;
+  private final Map<String, LocalResource> localResources;
+  private final TezConfiguration amConf;
+  private final Credentials credentials;
+
+  public AMConfiguration(String queueName, Map<String, String> env,
+      Map<String, LocalResource> localResources,
+      TezConfiguration conf, Credentials credentials) {
+    this.queueName = queueName;
+    if (conf != null) {
+      this.amConf = conf;
+    } else {
+      this.amConf = new TezConfiguration();
+    }
+
+    if (env != null) {
+      this.env = env;
+    } else {
+      this.env = new HashMap<String, String>(0);
+    }
+    this.localResources = localResources;
+    String stagingDirStr = amConf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
+    if (stagingDirStr == null || stagingDirStr.isEmpty()) {
+      throw new TezUncheckedException("Staging directory for AM resources"
+          + " not specified in config"
+          + ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
+    }
+    try {
+      FileSystem fs = FileSystem.get(amConf);
+      this.stagingDir = fs.resolvePath(new Path(stagingDirStr));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    this.credentials = credentials;
+  }
+
+  public Path getStagingDir() {
+    return stagingDir;
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public Map<String, LocalResource> getLocalResources() {
+    return localResources;
+  }
+
+  public TezConfiguration getAMConf() {
+    return amConf;
+  }
+
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+  public void isCompatible(AMConfiguration other) {
+    // TODO implement
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index 636fe3c..df260ec 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -20,14 +20,11 @@ package org.apache.tez.client;
 
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -38,20 +35,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-
-import com.google.protobuf.ServiceException;
 
 public class TezClient {
   private static final Log LOG = LogFactory.getLog(TezClient.class);
 
   private final TezConfiguration conf;
+  private final YarnConfiguration yarnConf;
   private YarnClient yarnClient;
   Map<String, LocalResource> tezJarResources = null;
 
@@ -71,132 +62,34 @@ public class TezClient {
    */
   public TezClient(TezConfiguration conf) {
     this.conf = conf;
+    this.yarnConf = new YarnConfiguration(conf);
     yarnClient = new YarnClientImpl();
-    yarnClient.init(new YarnConfiguration(conf));
+    yarnClient.init(yarnConf);
     yarnClient.start();
   }
 
-  /**
-   * Submit a Tez DAG to YARN as an application. The job will be submitted to
-   * the yarn cluster which was specified when creating this
-   * {@link TezClient} instance.
-   *
-   * @param dag
-   *          <code>DAG</code> to be submitted
-   * @param appStagingDir
-   *          FileSystem path in which resources will be copied
-   * @param ts
-   *          Application credentials
-   * @param amQueueName
-   *          Queue to which the application will be submitted
-   * @param amArgs
-   *          Command line Java arguments for the ApplicationMaster
-   * @param amEnv
-   *          Environment to be added to the ApplicationMaster
-   * @param amLocalResources
-   *          YARN local resource for the ApplicationMaster
-   * @param conf
-   *          Configuration for the Tez DAG AM. tez configuration keys from this
-   *          config will be used when running the AM. Look at
-   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
-   *          parameters need to be changed.
-   * @return <code>ApplicationId</code> of the submitted Tez application
-   * @throws IOException
-   * @throws TezException
-   */
-  public DAGClient submitDAGApplication(DAG dag, Path appStagingDir,
-      Credentials ts, String amQueueName, List<String> amArgs,
-      Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
-      TezConfiguration amConf) throws IOException, TezException {
-    ApplicationId appId = createApplication();
-    return submitDAGApplication(appId, dag, appStagingDir, ts, amQueueName,
-        dag.getName(), amArgs, amEnv, amLocalResources, amConf);
-  }
 
-  /**
-   * Submit a Tez DAG to YARN as an application. The job will be submitted to
-   * the yarn cluster which was specified when creating this
-   * {@link TezClient} instance. The AM will wait for the <code>DAG</code> to
-   * be submitted via RPC.
-   *
-   * @param amName
-   *          Name of the application
-   * @param appStagingDir
-   *          FileSystem path in which resources will be copied
-   * @param ts
-   *          Application credentials
-   * @param amQueueName
-   *          Queue to which the application will be submitted
-   * @param amArgs
-   *          Command line Java arguments for the ApplicationMaster
-   * @param amEnv
-   *          Environment to be added to the ApplicationMaster
-   * @param amLocalResources
-   *          YARN local resource for the ApplicationMaster
-   * @param conf
-   *          Configuration for the Tez DAG AM. tez configuration keys from this
-   *          config will be used when running the AM. Look at
-   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
-   *          parameters need to be changed.
-   * @return <code>ApplicationId</code> of the submitted Tez application
-   * @throws IOException
-   * @throws TezException
-   */
-  public DAGClient submitDAGApplication(String amName, Path appStagingDir,
-      Credentials ts, String amQueueName, List<String> amArgs,
-      Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
-      TezConfiguration amConf) throws IOException, TezException {
+  public DAGClient submitDAGApplication(DAG dag, AMConfiguration amConfig)
+      throws TezException, IOException {
     ApplicationId appId = createApplication();
-    return submitDAGApplication(appId, null, appStagingDir, ts, amQueueName,
-        amName, amArgs, amEnv, amLocalResources, amConf);
+    return submitDAGApplication(appId, dag, amConfig);
   }
 
-  /**
-   * Submit a Tez DAG to YARN with known <code>ApplicationId</code>. This is a
-   * private method and is only meant to be used within Tez for MR client
-   * backward compatibility.
-   *
-   * @param appId
-   *          - <code>ApplicationId</code> to be used
-   * @param dag
-   *          <code>DAG</code> to be submitted
-   * @param appStagingDir
-   *          FileSystem path in which resources will be copied
-   * @param ts
-   *          Application credentials
-   * @param amQueueName
-   *          Queue to which the application will be submitted
-   * @param amArgs
-   *          Command line Java arguments for the ApplicationMaster
-   * @param amEnv
-   *          Environment to be added to the ApplicationMaster
-   * @param amLocalResources
-   *          YARN local resource for the ApplicationMaster
-   * @param conf
-   *          Configuration for the Tez DAG AM. tez configuration keys from this
-   *          config will be used when running the AM. Look at
-   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
-   *          parameters need to be changed.
-   * @return <code>ApplicationId</code> of the submitted Tez application
-   * @throws IOException
-   * @throws TezException
-   */
   @Private
-  public DAGClient submitDAGApplication(ApplicationId appId, DAG dag,
-      Path appStagingDir, Credentials ts, String amQueueName, String amName,
-      List<String> amArgs, Map<String, String> amEnv,
-      Map<String, LocalResource> amLocalResources, TezConfiguration amConf)
-      throws IOException, TezException {
+  // To be used only by YarnRunner
+  public DAGClient submitDAGApplication(ApplicationId appId,
+      DAG dag, AMConfiguration amConfig)
+          throws TezException, IOException {
     try {
       ApplicationSubmissionContext appContext =
-          TezClientUtils.createApplicationSubmissionContext(
-          conf, appId, dag, appStagingDir, ts, amQueueName, amName, amArgs,
-          amEnv, amLocalResources, amConf, getTezJarResources());
+          TezClientUtils.createApplicationSubmissionContext(conf, appId, dag,
+              dag.getName(), amConfig, getTezJarResources());
+      LOG.info("Submitting DAG to YARN"
+          + ", applicationId=" + appId);
       yarnClient.submitApplication(appContext);
     } catch (YarnException e) {
       throw new TezException(e);
     }
-
     return getDAGClient(appId);
   }
 
@@ -248,169 +141,4 @@ public class TezClient {
                    append(idFormat.format(1)).toString();
   }
 
-  /**
-   * Create a Tez Session. This will launch a Tez AM on the cluster and
-   * the session then can be used to submit dags to this Tez AM.
-   * @param appId Application Id
-   * @param sessionName
-   *          Name of the Session
-   * @param appStagingDir
-   *          FileSystem path in which resources will be copied
-   * @param ts
-   *          Application credentials
-   * @param amQueueName
-   *          Queue to which the application will be submitted
-   * @param amArgs
-   *          Command line Java arguments for the ApplicationMaster
-   * @param amEnv
-   *          Environment to be added to the ApplicationMaster
-   * @param amLocalResources
-   *          YARN local resource for the ApplicationMaster
-   * @param conf
-   *          Configuration for the Tez DAG AM. tez configuration keys from this
-   *          config will be used when running the AM. Look at
-   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
-   *          parameters need to be changed.
-   * @return TezSession handle to submit subsequent jobs
-   * @throws IOException
-   * @throws TezException
-   */
-  public TezSession createSession(ApplicationId appId, String sessionName,
-      Path appStagingDir, Credentials ts, String amQueueName,
-      List<String> amArgs, Map<String, String> amEnv,
-      Map<String, LocalResource> amLocalResources,
-      TezConfiguration amConf) throws TezException, IOException {
-    if (appId == null) {
-      appId = createApplication();
-    }
-    TezSession tezSession = new TezSession(sessionName, appId);
-    LOG.info("Creating a TezSession"
-        + ", sessionName=" + sessionName
-        + ", applicationId=" + appId);
-    try {
-      ApplicationSubmissionContext appContext =
-          TezClientUtils.createApplicationSubmissionContext(
-          conf, appId, null, appStagingDir, ts, amQueueName, sessionName,
-          amArgs, amEnv, amLocalResources, amConf, getTezJarResources());
-      tezSession.setTezConfigurationLocalResource(
-          appContext.getAMContainerSpec().getLocalResources().get(
-              TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
-      yarnClient.submitApplication(appContext);
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-    return tezSession;
-  }
-
-  /**
-   * Create a Tez Session. This will launch a Tez AM on the cluster and
-   * the session then can be used to submit dags to this Tez AM.
-   * @param sessionName
-   *          Name of the Session
-   * @param appStagingDir
-   *          FileSystem path in which resources will be copied
-   * @param ts
-   *          Application credentials
-   * @param amQueueName
-   *          Queue to which the application will be submitted
-   * @param amArgs
-   *          Command line Java arguments for the ApplicationMaster
-   * @param amEnv
-   *          Environment to be added to the ApplicationMaster
-   * @param amLocalResources
-   *          YARN local resource for the ApplicationMaster
-   * @param conf
-   *          Configuration for the Tez DAG AM. tez configuration keys from this
-   *          config will be used when running the AM. Look at
-   *          {@link TezConfiguration} for keys. This can be null if no DAG AM
-   *          parameters need to be changed.
-   * @return TezSession handle to submit subsequent jobs
-   * @throws IOException
-   * @throws TezException
-   */
-  public synchronized TezSession createSession(String sessionName,
-      Path appStagingDir, Credentials ts, String amQueueName,
-      List<String> amArgs, Map<String, String> amEnv, Map<String,
-      LocalResource> amLocalResources, TezConfiguration amConf)
-          throws TezException, IOException {
-    return createSession(null, sessionName, appStagingDir, ts, amQueueName,
-        amArgs, amEnv, amLocalResources, amConf);
-  }
-
-  private DAGClientAMProtocolBlockingPB getAMProxy(
-      ApplicationId applicationId) throws TezException, IOException {
-    return TezClientUtils.getAMProxy(yarnClient, conf, applicationId);
-  }
-
-  public synchronized DAGClient submitDAG(TezSession tezSession, DAG dag)
-      throws IOException, TezException {
-    String dagId = null;
-    LOG.info("Submitting dag to TezSession"
-        + ", sessionName=" + tezSession.getSessionName()
-        + ", applicationId=" + tezSession.getApplicationId());
-    // Add tez jars to vertices too
-    for (Vertex v : dag.getVertices()) {
-      v.getTaskLocalResources().putAll(getTezJarResources());
-      if (null != tezSession.getTezConfigurationLocalResource()) {
-        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
-            tezSession.getTezConfigurationLocalResource());
-      }
-    }
-    DAGPlan dagPlan = dag.createDag(null);
-    SubmitDAGRequestProto requestProto =
-        SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
-
-    DAGClientAMProtocolBlockingPB proxy;
-    while (true) {
-      proxy = getAMProxy(tezSession.getApplicationId());
-      if (proxy != null) {
-        break;
-      }
-      try {
-        Thread.sleep(100l);
-      } catch (InterruptedException e) {
-        // Ignore
-      }
-    }
-
-    try {
-      dagId = proxy.submitDAG(null, requestProto).getDagId();
-    } catch (ServiceException e) {
-      throw new TezException(e);
-    }
-    LOG.info("Submitted dag to TezSession"
-        + ", sessionName=" + tezSession.getSessionName()
-        + ", applicationId=" + tezSession.getApplicationId()
-        + ", dagId=" + dagId);
-
-    return new DAGClientRPCImpl(tezSession.getApplicationId(), dagId, conf);
-  }
-
-  public synchronized void closeSession(TezSession tezSession)
-      throws TezException, IOException {
-    LOG.info("Closing down Tez Session"
-        + ", sessionName=" + tezSession.getSessionName()
-        + ", applicationId=" + tezSession.getApplicationId());
-    DAGClientAMProtocolBlockingPB proxy = getAMProxy(
-        tezSession.getApplicationId());
-    if (proxy != null) {
-      try {
-        ShutdownSessionRequestProto request =
-            ShutdownSessionRequestProto.newBuilder().build();
-        proxy.shutdownSession(null, request);
-        return;
-      } catch (ServiceException e) {
-        LOG.info("Failed to shutdown Tez Session via proxy", e);
-      }
-    }
-    LOG.info("Could not connect to AM, killing session via YARN"
-        + ", sessionName=" + tezSession.getSessionName()
-        + ", applicationId=" + tezSession.getApplicationId());
-    try {
-      yarnClient.killApplication(tezSession.getApplicationId());
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 4dc5baf..7c6a5ed 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -98,7 +97,7 @@ public class TezClientUtils {
    * @throws IOException
    */
   static Map<String, LocalResource> setupTezJarsLocalResources(
-      Configuration conf)
+      TezConfiguration conf)
       throws IOException {
     Map<String, LocalResource> tezJarResources =
         new TreeMap<String, LocalResource>();
@@ -220,34 +219,31 @@ public class TezClientUtils {
    * @throws YarnException
    */
   static ApplicationSubmissionContext createApplicationSubmissionContext(
-      Configuration conf, ApplicationId appId, DAG dag, Path appStagingDir,
-      Credentials ts, String amQueueName, String amName, List<String> amArgs,
-      Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
-      TezConfiguration appConf,
+      Configuration conf, ApplicationId appId, DAG dag, String amName,
+      AMConfiguration amConfig,
       Map<String, LocalResource> tezJarResources)
-          throws IOException, YarnException {
+          throws IOException, YarnException{
 
-    if (appConf == null) {
-      appConf = new TezConfiguration();
-    }
-
-    FileSystem fs = TezClientUtils.ensureStagingDirExists(conf, appStagingDir);
+    FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
+        amConfig.getStagingDir());
 
     // Setup resource requirements
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(
-        conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+        amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
             TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
     capability.setVirtualCores(
-        conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
+        amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
             TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
-    LOG.debug("AppMaster capability = " + capability);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("AppMaster capability = " + capability);
+    }
 
     ByteBuffer securityTokens = null;
     // Setup security tokens
-    if (ts != null) {
+    if (amConfig.getCredentials() != null) {
       DataOutputBuffer dob = new DataOutputBuffer();
-      ts.writeTokenStorageToStream(dob);
+      amConfig.getCredentials().writeTokenStorageToStream(dob);
       securityTokens = ByteBuffer.wrap(dob.getData(), 0,
           dob.getLength());
     }
@@ -256,13 +252,13 @@ public class TezClientUtils {
     List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
-    String amLogLevel = conf.get(TezConfiguration.TEZ_AM_LOG_LEVEL,
-                                 TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
+    String amLogLevel = amConfig.getAMConf().get(
+        TezConfiguration.TEZ_AM_LOG_LEVEL,
+        TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
     addLog4jSystemProperties(amLogLevel, vargs);
 
-    if (amArgs != null) {
-      vargs.addAll(amArgs);
-    }
+    vargs.add(amConfig.getAMConf().get(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        TezConfiguration.DEFAULT_TEZ_AM_JAVA_OPTS));
 
     vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -311,8 +307,8 @@ public class TezClientUtils {
       }
     }
 
-    if (amEnv != null) {
-      for (Map.Entry<String, String> entry : amEnv.entrySet()) {
+    if (amConfig.getEnv() != null) {
+      for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
         Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
       }
     }
@@ -320,14 +316,14 @@ public class TezClientUtils {
     Map<String, LocalResource> localResources =
         new TreeMap<String, LocalResource>();
 
-    if (amLocalResources != null) {
-      localResources.putAll(amLocalResources);
+    if (amConfig.getLocalResources() != null) {
+      localResources.putAll(amConfig.getLocalResources());
     }
     localResources.putAll(tezJarResources);
 
     // emit conf as PB file
-    Configuration finalTezConf = createFinalTezConfForApp(appConf);
-    Path binaryConfPath =  new Path(appStagingDir,
+    Configuration finalTezConf = createFinalTezConfForApp(amConfig.getAMConf());
+    Path binaryConfPath =  new Path(amConfig.getStagingDir(),
         TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
@@ -367,10 +363,10 @@ public class TezClientUtils {
       }
 
       // emit protobuf DAG file style
-      Path binaryPath =  new Path(appStagingDir,
+      Path binaryPath =  new Path(amConfig.getStagingDir(),
           TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
-      appConf.set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH, binaryPath.toUri()
-          .toString());
+      amConfig.getAMConf().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
+          binaryPath.toUri().toString());
 
       DAGPlan dagPB = dag.createDag(null);
 
@@ -393,7 +389,8 @@ public class TezClientUtils {
               LocalResourceVisibility.APPLICATION));
 
       if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
-        Path textPath = localizeDagPlanAsText(dagPB, fs, appStagingDir, appId);
+        Path textPath = localizeDagPlanAsText(dagPB, fs,
+            amConfig.getStagingDir(), appId);
         localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
             TezClientUtils.createLocalResource(fs,
                 textPath, LocalResourceType.FILE,
@@ -419,14 +416,15 @@ public class TezClientUtils {
     appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
     appContext.setApplicationId(appId);
     appContext.setResource(capability);
-    appContext.setQueue(amQueueName);
+    appContext.setQueue(amConfig.getQueueName());
     appContext.setApplicationName(amName);
-    appContext.setCancelTokensWhenComplete(conf.getBoolean(
+    appContext.setCancelTokensWhenComplete(amConfig.getAMConf().getBoolean(
         TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
         TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
     appContext.setAMContainerSpec(amContainer);
 
     return appContext;
+
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
index 63e5d7d..acf523d 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -18,38 +18,167 @@
 
 package org.apache.tez.client;
 
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
 
 public class TezSession {
 
+  private static final Log LOG = LogFactory.getLog(TezSession.class);
+
   private final String sessionName;
-  private final ApplicationId applicationId;
+  private ApplicationId applicationId;
   private LocalResource tezConfPBLRsrc = null;
+  private final TezSessionConfiguration sessionConfig;
+  private YarnClient yarnClient;
+  private Map<String, LocalResource> tezJarResources;
+  private boolean sessionStarted = false;
 
-  public TezSession(String sessionName, ApplicationId applicationId) {
+  public TezSession(String sessionName,
+      ApplicationId applicationId,
+      TezSessionConfiguration sessionConfig) {
     this.sessionName = sessionName;
+    this.sessionConfig = sessionConfig;
     this.applicationId = applicationId;
   }
 
-  @Private
-  public ApplicationId getApplicationId() {
-    return applicationId;
+  public TezSession(String sessionName,
+      TezSessionConfiguration sessionConfig) {
+    this(sessionName, null, sessionConfig);
   }
 
-  public String getSessionName() {
-    return sessionName;
+  public synchronized void start() throws TezException, IOException {
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(sessionConfig.getYarnConfiguration());
+    yarnClient.start();
+
+    tezJarResources = TezClientUtils.setupTezJarsLocalResources(
+        sessionConfig.getTezConfiguration());
+
+    try {
+      if (applicationId == null) {
+        applicationId = yarnClient.createApplication().
+            getNewApplicationResponse().getApplicationId();
+      }
+
+      ApplicationSubmissionContext appContext =
+          TezClientUtils.createApplicationSubmissionContext(
+              sessionConfig.getTezConfiguration(), applicationId,
+              null, sessionName, sessionConfig.getAMConfiguration(),
+              tezJarResources);
+      tezConfPBLRsrc = appContext.getAMContainerSpec().getLocalResources().get(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+      yarnClient.submitApplication(appContext);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    sessionStarted = true;
   }
 
-  @Private
-  public LocalResource getTezConfigurationLocalResource() {
-    return tezConfPBLRsrc;
+  public synchronized DAGClient submitDAG(DAG dag)
+      throws TezException, IOException {
+    if (!sessionStarted) {
+      throw new TezUncheckedException("Session not started");
+    }
+
+    String dagId = null;
+    LOG.info("Submitting dag to TezSession"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId);
+    // Add tez jars to vertices too
+    for (Vertex v : dag.getVertices()) {
+      v.getTaskLocalResources().putAll(tezJarResources);
+      if (null != tezConfPBLRsrc) {
+        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+            tezConfPBLRsrc);
+      }
+    }
+    DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
+    SubmitDAGRequestProto requestProto =
+        SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
+
+    DAGClientAMProtocolBlockingPB proxy;
+    while (true) {
+      proxy = TezClientUtils.getAMProxy(yarnClient,
+          sessionConfig.getYarnConfiguration(), applicationId);
+      if (proxy != null) {
+        break;
+      }
+      try {
+        Thread.sleep(100l);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+
+    try {
+      dagId = proxy.submitDAG(null, requestProto).getDagId();
+    } catch (ServiceException e) {
+      throw new TezException(e);
+    }
+    LOG.info("Submitted dag to TezSession"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId
+        + ", dagId=" + dagId);
+    return new DAGClientRPCImpl(applicationId, dagId,
+        sessionConfig.getTezConfiguration());
   }
 
-  @Private
-  public void setTezConfigurationLocalResource(LocalResource tezConfPBLRsrc) {
-    this.tezConfPBLRsrc = tezConfPBLRsrc;
+  public synchronized void stop() throws TezException, IOException {
+    LOG.info("Shutting down Tez Session"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId);
+    DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(yarnClient,
+        sessionConfig.getYarnConfiguration(), applicationId);
+    if (proxy != null) {
+      try {
+        ShutdownSessionRequestProto request =
+            ShutdownSessionRequestProto.newBuilder().build();
+        proxy.shutdownSession(null, request);
+        return;
+      } catch (ServiceException e) {
+        LOG.info("Failed to shutdown Tez Session via proxy", e);
+      }
+    }
+    LOG.info("Could not connect to AM, killing session via YARN"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId);
+    try {
+      yarnClient.killApplication(applicationId);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
   }
 
+  public String getSessionName() {
+    return sessionName;
+  }
+
+  @Private
+  @VisibleForTesting
+  public synchronized ApplicationId getApplicationId() {
+    return applicationId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
new file mode 100644
index 0000000..61ca60b
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public class TezSessionConfiguration {
+
+  private final AMConfiguration amConfiguration;
+  private final YarnConfiguration yarnConfig;
+  private final TezConfiguration tezConfig;
+
+  public TezSessionConfiguration(AMConfiguration amConfiguration,
+      TezConfiguration tezConfig) {
+    this.amConfiguration = amConfiguration;
+    this.tezConfig = tezConfig;
+    this.yarnConfig = new YarnConfiguration(tezConfig);
+  }
+
+  TezSessionConfiguration(AMConfiguration amConfiguration,
+      TezConfiguration tezConfig,
+      YarnConfiguration yarnConf) {
+    this.amConfiguration = amConfiguration;
+    this.tezConfig = tezConfig;
+    this.yarnConfig = yarnConf;
+  }
+
+  public AMConfiguration getAMConfiguration() {
+    return amConfiguration;
+  }
+
+  public YarnConfiguration getYarnConfiguration() {
+    return yarnConfig;
+  }
+
+  public TezConfiguration getTezConfiguration() {
+    return tezConfig;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index da91804..cc41856 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -53,6 +53,10 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX+"log.level";
   public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
 
+  public static final String TEZ_AM_JAVA_OPTS = TEZ_AM_PREFIX
+      + "java.opts";
+  public static final String DEFAULT_TEZ_AM_JAVA_OPTS = " -Xmx1024m ";
+
   public static final String TEZ_AM_CANCEL_DELEGATION_TOKEN = TEZ_AM_PREFIX +
       "am.complete.cancel.delegation.tokens";
   public static final boolean TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT = true;
@@ -113,27 +117,27 @@ public class TezConfiguration extends Configuration {
           + "shuffle-vertex-manager.max-src-fraction";
   public static final float
           TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
-  
+
   public static final String
           TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = TEZ_AM_PREFIX +
           "shuffle-vertex-manager.enable.auto-parallel";
-  public static final boolean 
+  public static final boolean
           TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
-  
+
   public static final String
           TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = TEZ_AM_PREFIX +
           "shuffle-vertex-manager.desired-task-input-size";
-  public static final long 
-          TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 
+  public static final long
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT =
           1024*1024*100L;
 
   public static final String
           TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = TEZ_AM_PREFIX +
           "shuffle-vertex-manager.min-task-parallelism";
-  public static final int 
+  public static final int
           TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
 
-  public static final String 
+  public static final String
           TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
           + "slowstart-dag-scheduler.min-resource-fraction";
   public static final float
@@ -186,7 +190,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS = TEZ_AM_PREFIX
       + "container.reuse.delay-allocation-millis";
   public static final long TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS_DEFAULT = 3000l;
-  
+
   public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb";
   public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb";
   public static final String TEZ_PB_PLAN_TEXT_NAME = "tez-dag.pb.txt";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index dd68705..dae5625 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -62,7 +63,7 @@ public class DAGClientRPCImpl implements DAGClient {
     this.dagId = dagId;
     this.conf = conf;
     yarnClient = new YarnClientImpl();
-    yarnClient.init(conf);
+    yarnClient.init(new YarnConfiguration(conf));
     yarnClient.start();
     appReport = null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b573a8f..0747f0e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.dag.api.DAG;
@@ -739,6 +740,11 @@ public class MRRSleepJob extends Configured implements Tool {
     ApplicationId appId =
         tezClient.createApplication();
 
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        conf.get(
+            TezConfiguration.TEZ_AM_STAGING_DIR,
+            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT));
+
     Path remoteStagingDir =
         remoteFs.makeQualified(new Path(conf.get(
             TezConfiguration.TEZ_AM_STAGING_DIR,
@@ -751,12 +757,14 @@ public class MRRSleepJob extends Configured implements Tool {
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
         iReduceSleepTime, iReduceSleepCount);
 
-    List<String> amArgs = new ArrayList<String>();
-    amArgs.add(MRHelpers.getMRAMJavaOpts(conf));
+    conf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        MRHelpers.getMRAMJavaOpts(conf));
+
+    AMConfiguration amConfig = new AMConfiguration(null, null,
+        null, conf, null);
 
     DAGClient dagClient =
-        tezClient.submitDAGApplication(appId, dag, remoteStagingDir,
-            null, null, dag.getName(), amArgs , null, null, conf);
+        tezClient.submitDAGApplication(appId, dag, amConfig);
 
     while (true) {
       DAGStatus status = dagClient.getDAGStatus();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index f1e39be..0a6c98a 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -50,9 +50,11 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -158,14 +160,16 @@ public class OrderedWordCount {
           " already exists");
     }
 
-    String baseDir = Path.SEPARATOR + "user" + Path.SEPARATOR
-        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR;
-    Path stagingDir = new Path(baseDir + Path.SEPARATOR + appId.toString());
+    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+        + Path.SEPARATOR + appId.toString();
+    Path stagingDir = new Path(stagingDirStr);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
     stagingDir = fs.makeQualified(stagingDir);
     TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
 
-    List<String> amArgs = new ArrayList<String>();
-    amArgs.add(MRHelpers.getMRAMJavaOpts(conf));
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        MRHelpers.getMRAMJavaOpts(conf));
 
     String jarPath = ClassUtil.findContainingJar(OrderedWordCount.class);
     if (jarPath == null)  {
@@ -188,11 +192,15 @@ public class OrderedWordCount {
     commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
 
     TezSession tezSession = null;
+    AMConfiguration amConfig = new AMConfiguration("default", null,
+        commonLocalResources, tezConf, null);
     if (useTezSession) {
       LOG.info("Creating Tez Session");
-      tezSession = tezClient.createSession(appId, "OrderedWordCountSession",
-          stagingDir, null, "default", amArgs, null, commonLocalResources,
-          tezConf);
+      TezSessionConfiguration sessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("OrderedWordCountSession",
+          sessionConfig);
+      tezSession.start();
       LOG.info("Created Tez Session");
     }
 
@@ -311,13 +319,11 @@ public class OrderedWordCount {
     DAGClient dagClient;
     if (useTezSession) {
       LOG.info("Submitting DAG to Tez Session");
-      dagClient = tezClient.submitDAG(tezSession, dag);
+      dagClient = tezSession.submitDAG(dag);
       LOG.info("Submitted DAG to Tez Session");
     } else {
       LOG.info("Submitting DAG as a new Tez Application");
-      dagClient = tezClient.submitDAGApplication(appId, dag, stagingDir, null,
-          "default", "OrderedWordCount", amArgs, null, commonLocalResources,
-          tezConf);
+      dagClient = tezClient.submitDAGApplication(dag, amConfig);
     }
 
     DAGStatus dagStatus = null;
@@ -355,7 +361,7 @@ public class OrderedWordCount {
     } finally {
       fs.delete(stagingDir, true);
       if (useTezSession) {
-        tezClient.closeSession(tezSession);
+        tezSession.stop();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index df5ecb7..7eae23d 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -20,7 +20,6 @@ package org.apache.tez.mapreduce;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -50,8 +49,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -418,19 +419,25 @@ public class TestMRRJobsDAGApi {
     amLocalResources.put("yarn-site.xml", yarnSiteLr);
     amLocalResources.putAll(commonLocalResources);
 
-    TezClient tezClient = new TezClient(new TezConfiguration(
-        mrrTezCluster.getConfig()));
+    TezConfiguration tezConf = new TezConfiguration(
+            mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    TezClient tezClient = new TezClient(tezConf);
     DAGClient dagClient = null;
     TezSession tezSession = null;
+    TezSessionConfiguration tezSessionConfig;
+    AMConfiguration amConfig = new AMConfiguration(
+        "default", commonEnv, amLocalResources,
+        tezConf, null);
     if(!dagViaRPC) {
       // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
-      dagClient = tezClient.submitDAGApplication(dag, remoteStagingDir,
-          null, "default", Collections.singletonList(""), commonEnv,
-          amLocalResources, new TezConfiguration());
+      dagClient = tezClient.submitDAGApplication(dag, amConfig);
     } else {
-      tezSession = tezClient.createSession("testsession", remoteStagingDir,
-          null, "default", Collections.singletonList(""), commonEnv,
-          amLocalResources, new TezConfiguration());
+      tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("testsession", tezSessionConfig);
+      tezSession.start();
     }
 
     if (dagViaRPC && closeSessionBeforeSubmit) {
@@ -448,7 +455,7 @@ public class TestMRRJobsDAGApi {
         YarnApplicationState appState = appReport.getYarnApplicationState();
         if (!sentKillSession) {
           if (appState == YarnApplicationState.RUNNING) {
-            tezClient.closeSession(tezSession);
+            tezSession.stop();
             sentKillSession = true;
           }
         } else {
@@ -473,7 +480,7 @@ public class TestMRRJobsDAGApi {
     if(dagViaRPC) {
       LOG.info("Submitting dag to tez session with appId="
           + tezSession.getApplicationId());
-      dagClient = tezClient.submitDAG(tezSession, dag);
+      dagClient = tezSession.submitDAG(dag);
     }
     DAGStatus dagStatus = dagClient.getDAGStatus();
     while (!dagStatus.isCompleted()) {
@@ -484,7 +491,7 @@ public class TestMRRJobsDAGApi {
           && dagStatus.getState() == DAGStatus.State.RUNNING) {
         LOG.info("Killing running dag/session");
         if (dagViaRPC) {
-          tezClient.closeSession(tezSession);
+          tezSession.stop();
         } else {
           dagClient.tryKillDAG();
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6725cc59/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 7360af4..0799e09 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -78,6 +77,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
@@ -550,6 +550,11 @@ public class YARNRunner implements ClientProtocol {
         MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
     vargs.add(mrAppMasterUserOptions);
 
+    StringBuilder javaOpts = new StringBuilder();
+    for (String varg : vargs) {
+      javaOpts.append(varg).append(" ");
+    }
+
     // Setup the CLASSPATH in environment
     // i.e. add { Hadoop jars, job jar, CWD } to classpath.
     Map<String, String> environment = new HashMap<String, String>();
@@ -558,22 +563,18 @@ public class YARNRunner implements ClientProtocol {
     MRHelpers.updateEnvironmentForMRAM(conf, environment);
 
     TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
+    dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
 
     // Submit to ResourceManager
     try {
-      Path appStagingDir = fs.resolvePath(new Path(jobSubmitDir));
-      dagClient = tezClient.submitDAGApplication(
-          appId,
-          dag,
-          appStagingDir,
-          ts,
+      dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          jobSubmitDir);
+      AMConfiguration amConfig = new AMConfiguration(
           jobConf.get(JobContext.QUEUE_NAME,
               YarnConfiguration.DEFAULT_QUEUE_NAME),
-          dag.getName(),
-          vargs,
           environment,
-          jobLocalResources, dagAMConf);
-
+          jobLocalResources, dagAMConf, ts);
+      tezClient.submitDAGApplication(appId, dag, amConfig);
     } catch (TezException e) {
       throw new IOException(e);
     }


Mime
View raw message