tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [2/2] git commit: TEZ-692. Unify job submission in either TezClient or TezSession (bikas)
Date Fri, 04 Jul 2014 01:15:08 GMT
TEZ-692. Unify job submission in either TezClient or TezSession (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6bb06d06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6bb06d06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6bb06d06

Branch: refs/heads/master
Commit: 6bb06d067e1342fe05acae108ded89c20cdf83de
Parents: c2db6b8
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jul 3 18:14:41 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jul 3 18:14:41 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/AMConfiguration.java  | 105 ++-
 .../apache/tez/client/TezAppMasterStatus.java   |  30 +
 .../java/org/apache/tez/client/TezClient.java   | 686 +++++++++++++++++--
 .../org/apache/tez/client/TezClientUtils.java   |  28 +-
 .../java/org/apache/tez/client/TezSession.java  | 447 ------------
 .../tez/client/TezSessionConfiguration.java     |  82 ---
 .../org/apache/tez/client/TezSessionStatus.java |  30 -
 .../org/apache/tez/common/TezCommonUtils.java   |   1 -
 .../apache/tez/dag/api/DagTypeConverters.java   |  14 +-
 .../apache/tez/dag/api/TezConfiguration.java    |   4 +
 .../org/apache/tez/client/TestTezClient.java    | 208 ++++++
 .../apache/tez/client/TestTezClientUtils.java   |   1 -
 .../java/org/apache/tez/common/TezUtils.java    |   2 -
 .../tez/dag/api/client/DAGClientHandler.java    |  14 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |   4 +-
 .../dag/api/client/TestDAGClientHandler.java    |   8 +-
 .../examples/BroadcastAndOneToOneExample.java   |  29 +-
 .../mapreduce/examples/FilterLinesByWord.java   |   9 +-
 .../examples/FilterLinesByWordOneToOne.java     |   9 +-
 .../examples/GroupByOrderByMRRTest.java         |   4 +-
 .../mapreduce/examples/IntersectDataGen.java    |  19 +-
 .../mapreduce/examples/IntersectExample.java    |  21 +-
 .../mapreduce/examples/IntersectValidate.java   |  19 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  26 +-
 .../mapreduce/examples/OrderedWordCount.java    |  54 +-
 .../tez/mapreduce/examples/UnionExample.java    |  26 +-
 .../tez/mapreduce/examples/WordCount.java       |  27 +-
 .../apache/tez/mapreduce/client/YARNRunner.java |  31 +-
 .../tez/mapreduce/hadoop/TestMRHelpers.java     |   3 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 132 ++--
 .../tez/test/FaultToleranceTestRunner.java      |  27 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |  28 +-
 .../org/apache/tez/test/TestDAGRecovery2.java   |  41 +-
 .../org/apache/tez/test/TestFaultTolerance.java |  27 +-
 .../java/org/apache/tez/test/TestTezJobs.java   |  50 +-
 36 files changed, 1129 insertions(+), 1118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index abfcd27..a372031 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ INCOMPATIBLE CHANGES
   TEZ-1131. Simplify EdgeManager APIs
   TEZ-1127. Add TEZ_TASK_JAVA_OPTS and TEZ_ENV configs to specify values from
   config
+  TEZ-692. Unify job submission in either TezClient or TezSession
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 7c9dd3c..3b4f4bf 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -18,86 +18,85 @@
 
 package org.apache.tez.client;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
 
-public class AMConfiguration {
+import com.google.common.collect.Maps;
 
-  private final Path stagingDir;
-  private final String queueName;
-  private final Map<String, String> env;
-  private final Map<String, LocalResource> localResources;
-  private final TezConfiguration amConf;
-  private final Credentials credentials;
+@Private
+class AMConfiguration {
 
-  /**
-   * @param env
-   *          environment for the AM
-   * @param localResources
-   *          localResources which are required to run the AM
-   * @param conf
-   * @param credentials
-   *          credentials which will be needed in the AM. This includes
-   *          credentials which will be required to localize the specified
-   *          localResources.
-   */
-  public AMConfiguration(Map<String, String> env,
-      Map<String, LocalResource> localResources,
-      TezConfiguration conf, Credentials credentials) {
-    if (conf != null) {
-      this.amConf = conf;
-    } else {
-      this.amConf = new TezConfiguration();
-    }
-    this.queueName = this.amConf.get(TezConfiguration.TEZ_QUEUE_NAME);
+  private Map<String, LocalResource> localResources;
+  private TezConfiguration tezConf;
+  private Credentials credentials;
+  private YarnConfiguration yarnConfig;
+  private Map<String, String> env;
 
-    this.env = new HashMap<String, String>();
-    if (env != null) {
-      this.env.putAll(env);
+  AMConfiguration(TezConfiguration tezConf, Map<String, LocalResource> localResources,
+      Credentials credentials) {
+    this.localResources = Maps.newHashMap();
+    this.tezConf = tezConf;
+    if (localResources != null) {
+      addLocalResources(localResources);
+    }
+    if (credentials != null) {
+      setCredentials(credentials);
     }
 
-    this.localResources = localResources;
-    this.stagingDir = TezCommonUtils.getTezBaseStagingPath(amConf);
-    this.credentials = credentials;
   }
 
-  public Path getStagingDir() {
-    return stagingDir;
+  void addLocalResources(Map<String, LocalResource> localResources) {
+    this.localResources.putAll(localResources);
+  }
+  
+  void clearLocalResources() {
+    this.localResources.clear();
+  }
+  
+  void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+  
+  void setTezConfiguration(TezConfiguration tezConf) {
+    this.tezConf = tezConf;
+  }
+  
+  void setYarnConfiguration(YarnConfiguration yarnConf) {
+    this.yarnConfig = yarnConf;
   }
 
-  public String getQueueName() {
-    return queueName;
+  Path getStagingDir() {
+    return TezCommonUtils.getTezBaseStagingPath(tezConf);
   }
 
-  public Map<String, String> getEnv() {
-    return env;
+  String getQueueName() {
+    return this.tezConf.get(TezConfiguration.TEZ_QUEUE_NAME);
   }
 
-  public Map<String, LocalResource> getLocalResources() {
+  Map<String, LocalResource> getLocalResources() {
     return localResources;
   }
 
-  public TezConfiguration getAMConf() {
-    return amConf;
+  TezConfiguration getTezConfiguration() {
+    return tezConf;
   }
 
-  public Credentials getCredentials() {
+  YarnConfiguration getYarnConfiguration() {
+    return yarnConfig;
+  }
+  
+  Credentials getCredentials() {
     return credentials;
   }
-
-  public void isCompatible(AMConfiguration other) {
-    // TODO implement
+  
+  Map<String, String> getEnv() {
+    return env;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/TezAppMasterStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezAppMasterStatus.java b/tez-api/src/main/java/org/apache/tez/client/TezAppMasterStatus.java
new file mode 100644
index 0000000..f8cd310
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezAppMasterStatus.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+public enum TezAppMasterStatus {
+  /** App Master initializing itself */
+  INITIALIZING,
+  /** App Master ready to run DAG */
+  READY,
+  /** App Master is running a DAG */
+  RUNNING,
+  /** App Master has shut down or is in the process of shutting down. */
+  SHUTDOWN
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index dff6d4b..d984877 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -1,94 +1,654 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.tez.client;
 
+import java.io.File;
 import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.Map;
-import java.util.UUID;
+
+import javax.annotation.Nullable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ServiceException;
 
+/**
+ * TezClient is used to submit Tez DAGs for execution. DAG's are executed via a
+ * Tez App Master. TezClient can run the App Master in session or non-session
+ * mode. <br>
+ * In non-session mode, each DAG is executed in a different App Master that
+ * exits after the DAG execution completes. <br>
+ * In session mode, the TezClient creates a single instance of the App Master
+ * and all DAG's are submitted to the same App Master.<br>
+ * Session mode may give better performance when a series of DAGs need to
+ * executed because it enables resource re-use across those DAGs. Non-session
+ * mode should be used when the user wants to submit a single DAG or wants to
+ * disconnect from the cluster after submitting a set of unrelated DAGs. <br>
+ * If API recommendations are followed, then the choice of running in session or
+ * non-session mode is transparent to writing the application. By changing the
+ * session mode configuration, the same application can be running in session or
+ * non-session mode.
+ */
 public class TezClient {
+
   private static final Log LOG = LogFactory.getLog(TezClient.class);
 
-  private final TezConfiguration conf;
-  private final YarnConfiguration yarnConf;
+  private final String clientName;
+  private ApplicationId sessionAppId;
+  private ApplicationId lastSubmittedAppId;
+  private AMConfiguration amConfig;
   private YarnClient yarnClient;
-  Map<String, LocalResource> tezJarResources = null;
+  private boolean isSession;
+  private boolean sessionStarted = false;
+  private boolean sessionStopped = false;
+  /** Tokens which will be required for all DAGs submitted to this session. */
+  private Credentials sessionCredentials = new Credentials();
+  private long clientTimeout;
+  Map<String, LocalResource> cachedTezJarResources;
+  private static final long SLEEP_FOR_READY = 500;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
+  private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
 
   /**
+   * Create a new TezClient. Session or non-session execution mode will be
+   * inferred from configuration. 
+   * @param name
+   *          Name of the client. Used for logging etc. This will also be used
+   *          as app master name is session mode
+   * @param tezConf
+   *          Configuration for the framework
+   */
+  public TezClient(String name, TezConfiguration tezConf) {
+    this(name, tezConf, tezConf.getBoolean(
+        TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
+  }
+  
+  /**
+   * Create a new TezClient. Session or non-session execution mode will be
+   * inferred from configuration. Set the initial resources and security
+   * credentials for the App Master. If app master resources/credentials are
+   * needed then this is the recommended constructor for session mode execution.
+   * 
+   * @param name
+   *          Name of the client. Used for logging etc. This will also be used
+   *          as app master name is session mode
+   * @param tezConf
+   *          Configuration for the framework
+   * @param localResources
+   *          resources for the App Master
+   * @param credentials
+   *          Set security credentials to be used inside the app master, if
+   *          needed. Tez App Master needs credentials to access the staging
+   *          directory and for most HDFS cases these are automatically obtained
+   *          by Tez client. If the staging directory is on a file system for
+   *          which credentials cannot be obtained or for any credentials needed
+   *          by user code running inside the App Master, credentials must be
+   *          supplied by the user. These will be used by the App Master for the
+   *          next DAG. <br>
+   *          In session mode, credentials, if needed, must be set before
+   *          calling start()
+   */
+  public TezClient(String name, TezConfiguration tezConf,
+      @Nullable Map<String, LocalResource> localResources,
+      @Nullable Credentials credentials) {
+    this(name, tezConf, tezConf.getBoolean(
+        TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT),
+        localResources, credentials);
+  }
+  
+  /**
+   * Create a new TezClient with AM session mode set explicitly. This overrides
+   * the setting from configuration.
+   * @param name
+   *          Name of the client. Used for logging etc. This will also be used
+   *          as app master name is session mode
+   * @param tezConf Configuration for the framework
+   * @param isSession The AM will run in session mode or not
+   */
+  public TezClient(String name, TezConfiguration tezConf, boolean isSession) {
+    this(name, tezConf, isSession, null, null);
+  }
+
+  /**
+   * Create a new TezClient with AM session mode set explicitly. This overrides
+   * the setting from configuration.
+   * Set the initial resources and security credentials for the App Master.
+   * @param name
+   *          Name of the client. Used for logging etc. This will also be used
+   *          as app master name is session mode
+   * @param tezConf Configuration for the framework
+   * @param isSession The AM will run in session mode or not
+   * @param localResources resources for the App Master
+   * @param credentials credentials for the App Master
+   */
+  public TezClient(String name, TezConfiguration tezConf, boolean isSession,
+      @Nullable Map<String, LocalResource> localResources,
+      @Nullable Credentials credentials) {
+    this.clientName = name;
+    this.isSession = isSession;
+    this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
+  }
+  
+  /**
+   * Add local resources for the DAG App Master. <br>
    * <p>
-   * Create an instance of the TezClient which will be used to communicate with
-   * a specific instance of YARN, or TezService when that exists.
-   * </p>
+   * In non-session mode these will be added to the resources of the App Master
+   * to be launched for the next DAG. Resources added via this method will
+   * accumulate and be used for every new App Master until
+   * clearAppMasterLocalResource() is invoked. <br>
    * <p>
-   * Separate instances of TezClient should be created to communicate with
-   * different instances of YARN
-   * </p>
-   *
-   * @param conf
-   *          the configuration which will be used to establish which YARN or
-   *          Tez service instance this client is associated with.
+   * In session mode, the recommended usage is to add all resources before
+   * calling start() so that all needed resources are available to the app
+   * master before it starts. When called after start(), these local resources
+   * will be re-localized to the running session DAG App Master and will be
+   * added to its classpath for execution of this DAG.
+   * <p>
+   * Caveats for invoking this method after start() in session mode: Resources
+   * accumulate across DAG submissions and are never removed from the classpath.
+   * Only LocalResourceType.FILE is supported. All resources will be treated as
+   * private.
+   * 
+   * @param localResources
+   */
+  public synchronized void addAppMasterLocalResources(Map<String, LocalResource> localResources) {
+    Preconditions.checkNotNull(localResources);
+    if (isSession && sessionStarted) {
+      additionalLocalResources.putAll(localResources);
+    }
+    amConfig.addLocalResources(localResources);
+  }
+  
+  /**
+   * If the next DAG App Master needs different local resources, then use this
+   * method to clear the local resources and then add the new local resources
+   * using addAppMasterLocalResources(). This method is a no-op in session mode,
+   * after start() is called.
+   */
+  public synchronized void clearAppMasterLocalResource() {
+    amConfig.clearLocalResources();
+  }
+  
+  /**
+   * Set security credentials to be used inside the app master, if needed. Tez App
+   * Master needs credentials to access the staging directory and for most HDFS
+   * cases these are automatically obtained by Tez client. If the staging
+   * directory is on a file system for which credentials cannot be obtained or
+   * for any credentials needed by user code running inside the App Master,
+   * credentials must be supplied by the user. These will be used by the App
+   * Master for the next DAG. <br>In session mode, credentials, if needed, must be
+   * set before calling start()
+   * 
+   * @param credentials
+   */
+  public synchronized void setAppMasterCredentials(Credentials credentials) {
+    Preconditions
+        .checkState(!sessionStarted,
+            "Credentials cannot be set after the session App Master has been started");
+    amConfig.setCredentials(credentials);
+  }
+  
+  /**
+   * Start the client. This establishes a connection to the YARN cluster.
+   * In session mode, this start the App Master thats runs all the DAGs in the
+   * session.
+   * @throws TezException
+   * @throws IOException
+   */
+  public synchronized void start() throws TezException, IOException {
+    amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration()));
+
+    yarnClient = createYarnClient();
+    yarnClient.init(amConfig.getYarnConfiguration());
+    yarnClient.start();    
+
+    if (isSession) {
+      LOG.info("Session mode. Starting session.");
+      TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
+          amConfig.getTezConfiguration());
+  
+      Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
+  
+      clientTimeout = amConfig.getTezConfiguration().getInt(
+          TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
+          TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
+  
+      try {
+        if (sessionAppId == null) {
+          sessionAppId = createApplication();
+        }
+  
+        // Add session token for shuffle
+        TezClientUtils.createSessionToken(sessionAppId.toString(),
+            jobTokenSecretManager, sessionCredentials);
+  
+        ApplicationSubmissionContext appContext =
+            TezClientUtils.createApplicationSubmissionContext(
+                amConfig.getTezConfiguration(), sessionAppId,
+                null, clientName, amConfig,
+                tezJarResources, sessionCredentials);
+  
+        // Set Tez Sessions to not retry on AM crashes if recovery is disabled
+        if (!amConfig.getTezConfiguration().getBoolean(
+            TezConfiguration.DAG_RECOVERY_ENABLED,
+            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+          appContext.setMaxAppAttempts(1);
+        }  
+        yarnClient.submitApplication(appContext);
+        sessionStarted = true;
+      } catch (YarnException e) {
+        throw new TezException(e);
+      }
+    }
+  }
+  
+  /**
+   * Submit a DAG. <br>In non-session mode, it submits a new App Master to the
+   * cluster.<br>In session mode, it submits the DAG to the session App Master. It
+   * blocks until either the DAG is submitted to the session or configured
+   * timeout period expires. Cleans up session if the submission timed out.
+   * 
+   * @param dag
+   *          DAG to be submitted to Session
+   * @return DAGClient to monitor the DAG
+   * @throws TezException
+   * @throws IOException
+   * @throws DAGSubmissionTimedOut
+   *           if submission timed out
+   */  
+  public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException,
+      InterruptedException {
+    if (isSession) {
+      return submitDAGSession(dag);
+    } else {
+      return submitDAGApplication(dag);
+    }
+  }
+
+  private synchronized DAGClient submitDAGSession(DAG dag)
+    throws TezException, IOException, InterruptedException {
+    Preconditions.checkState(isSession == true, 
+        "submitDAG with additional resources applies to only session mode. " + 
+        "In non-session mode please specify all resources in the initial configuration");
+    
+    verifySessionStateForSubmission();
+
+    String dagId = null;
+    LOG.info("Submitting dag to TezSession"
+      + ", sessionName=" + clientName
+      + ", applicationId=" + sessionAppId
+      + ", dagName=" + dag.getName());
+    
+    if (!additionalLocalResources.isEmpty()) {
+      for (LocalResource lr : additionalLocalResources.values()) {
+        Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
+            + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
+      }
+    }
+
+    // Obtain DAG specific credentials.
+    TezClientUtils.setupDAGCredentials(dag, sessionCredentials, amConfig.getTezConfiguration());
+
+    // TODO TEZ-1229 - fix jar resources
+    // setup env
+    for (Vertex v : dag.getVertices()) {
+      Map<String, String> taskEnv = v.getTaskEnvironment();
+      TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(),
+          TezConfiguration.TEZ_TASK_LAUNCH_ENV, TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT);
+      TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
+    }
+    
+    DAGPlan dagPlan = dag.createDag(amConfig.getTezConfiguration());
+    SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
+    requestBuilder.setDAGPlan(dagPlan).build();
+    if (!additionalLocalResources.isEmpty()) {
+      requestBuilder.setAdditionalAmResources(DagTypeConverters
+          .convertFromLocalResources(additionalLocalResources));
+    }
+    
+    additionalLocalResources.clear();
+
+    DAGClientAMProtocolBlockingPB proxy = waitForProxy();
+    if (proxy == null) {
+      try {
+        LOG.warn("DAG submission to session timed out, stopping session");
+        stop();
+      } catch (Throwable t) {
+        LOG.info("Got an exception when trying to stop session", t);
+      }
+      throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
+          + ", timed out after " + clientTimeout + " seconds");
+    }
+
+    try {
+      SubmitDAGResponseProto response = proxy.submitDAG(null, requestBuilder.build());
+      // the following check is only for testing since the final class
+      // SubmitDAGResponseProto cannot be mocked
+      if (response != null) {
+        dagId = response.getDagId();
+      }
+    } catch (ServiceException e) {
+      throw new TezException(e);
+    }
+    LOG.info("Submitted dag to TezSession"
+        + ", sessionName=" + clientName
+        + ", applicationId=" + sessionAppId
+        + ", dagName=" + dag.getName());
+    return new DAGClientRPCImpl(sessionAppId, dagId,
+        amConfig.getTezConfiguration());
+  }
+
+  /**
+   * Stop the client. This terminates the connection to the YARN cluster.
+   * In session mode, this shuts down the session DAG App Master
+   * @throws TezException
+   * @throws IOException
+   */
+  public synchronized void stop() throws TezException, IOException {
+    try {
+      if (sessionStarted) {
+        LOG.info("Shutting down Tez Session"
+            + ", sessionName=" + clientName
+            + ", applicationId=" + sessionAppId);
+        sessionStopped = true;
+        boolean sessionShutdownSuccessful = false;
+        try {
+          DAGClientAMProtocolBlockingPB proxy = getSessionAMProxy(sessionAppId);
+          if (proxy != null) {
+            ShutdownSessionRequestProto request =
+                ShutdownSessionRequestProto.newBuilder().build();
+            proxy.shutdownSession(null, request);
+            sessionShutdownSuccessful = true;
+          }
+        } catch (TezException e) {
+          LOG.info("Failed to shutdown Tez Session via proxy", e);
+        } catch (ServiceException e) {
+          LOG.info("Failed to shutdown Tez Session via proxy", e);
+        }
+        if (!sessionShutdownSuccessful) {
+          LOG.info("Could not connect to AM, killing session via YARN"
+              + ", sessionName=" + clientName
+              + ", applicationId=" + sessionAppId);
+          try {
+            yarnClient.killApplication(sessionAppId);
+          } catch (YarnException e) {
+            throw new TezException(e);
+          }
+        }
+      }
+    } finally {
+      if (yarnClient != null) {
+        yarnClient.close();
+      }
+    }
+  }
+
+  /**
+   * Get the name of the client
+   * @return name
+   */
+  public String getClientName() {
+    return clientName;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public synchronized ApplicationId getAppMasterApplicationId() {
+    ApplicationId appId = null;
+    if (isSession) {
+      appId = sessionAppId;
+    } else {
+      appId = lastSubmittedAppId;
+    }
+    return appId;
+  }
+
+  /**
+   * Get the status of the App Master executing the DAG
+   * In non-session mode it returns the status of the last submitted DAG App Master 
+   * In session mode, it returns the status of the App Master hosting the session
+   * 
+   * @return State of the session
+   * @throws TezException
+   * @throws IOException
+   */
+  public TezAppMasterStatus getAppMasterStatus() throws TezException, IOException {
+    // Supporting per-DAG app master case since user may choose to run the same 
+    // code in that mode and the code should continue to work. Its easy to provide 
+    // the correct view for per-DAG app master too.
+    ApplicationId appId = null;
+    if (isSession) {
+      appId = sessionAppId;
+    } else {
+      appId = lastSubmittedAppId;
+    }
+    Preconditions.checkState(appId != null, "Cannot get status without starting an application");
+    try {
+      ApplicationReport appReport = yarnClient.getApplicationReport(
+          appId);
+      switch (appReport.getYarnApplicationState()) {
+      case NEW:
+      case NEW_SAVING:
+      case ACCEPTED:
+      case SUBMITTED:
+        return TezAppMasterStatus.INITIALIZING;
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+        return TezAppMasterStatus.SHUTDOWN;
+      case RUNNING:
+        if (!isSession) {
+          return TezAppMasterStatus.RUNNING;
+        }
+        try {
+          DAGClientAMProtocolBlockingPB proxy = getSessionAMProxy(appId);
+          if (proxy == null) {
+            return TezAppMasterStatus.INITIALIZING;
+          }
+          GetAMStatusResponseProto response = proxy.getAMStatus(null,
+              GetAMStatusRequestProto.newBuilder().build());
+          return DagTypeConverters.convertTezSessionStatusFromProto(
+              response.getStatus());
+        } catch (TezException e) {
+          LOG.info("Failed to retrieve AM Status via proxy", e);
+        } catch (ServiceException e) {
+          LOG.info("Failed to retrieve AM Status via proxy", e);
+        }
+      }
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    return TezAppMasterStatus.INITIALIZING;
+  }
+
+  /**
+   * Inform the Session to pre-warm containers for upcoming DAGs.
+   * Can be invoked multiple times on the same session.
+   * A subsequent call will release containers that are not compatible with the
+   * new context.
+   * This function can only be invoked if there is no DAG running on the Session
+   * This function can be a no-op if the Session already holds the required
+   * number of containers.
+   * @param context Context for the pre-warm containers.
+   */
+  @Private
+  @InterfaceStability.Unstable
+  public void preWarm(PreWarmContext context)
+      throws IOException, TezException, InterruptedException {
+    verifySessionStateForSubmission();
+
+    try {
+      DAGClientAMProtocolBlockingPB proxy = waitForProxy();
+      if (proxy == null) {
+        throw new SessionNotRunning("Could not connect to Session within client"
+            + " timeout interval, timeoutSecs=" + clientTimeout);
+      }
+
+      String classpath = TezYARNUtils
+        .getFrameworkClasspath(amConfig.getYarnConfiguration());
+      Map<String, String> contextEnv = context.getEnvironment();
+      TezYARNUtils.addToEnvironment(contextEnv,
+        ApplicationConstants.Environment.CLASSPATH.name(),
+        classpath, File.pathSeparator);
+
+      DAGClientAMProtocolRPC.PreWarmRequestProto.Builder
+        preWarmReqProtoBuilder =
+          DAGClientAMProtocolRPC.PreWarmRequestProto.newBuilder();
+      preWarmReqProtoBuilder.setPreWarmContext(
+        DagTypeConverters.convertPreWarmContextToProto(context));
+      proxy.preWarm(null, preWarmReqProtoBuilder.build());
+      while (true) {
+        try {
+          Thread.sleep(1000);
+          TezAppMasterStatus status = getAppMasterStatus();
+          if (status.equals(TezAppMasterStatus.READY)) {
+            break;
+          } else if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
+            throw new SessionNotRunning("Could not connect to Session");
+          }
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    } catch (ServiceException e) {
+      throw new TezException(e);
+    }
+  }
+  
+  /**
+   * Wait till the DAG is ready to be submitted.
+   * In non-session mode this is a no-op since the application can be immediately
+   * submitted.
+   * In session mode, this waits for the session host to be ready to accept a DAG
+   * @throws IOException
+   * @throws TezException
    */
-  public TezClient(TezConfiguration conf) {
-    this.conf = conf;
-    this.yarnConf = new YarnConfiguration(conf);
-    yarnClient = new YarnClientImpl();
-    yarnClient.init(yarnConf);
-    yarnClient.start();
+  @InterfaceStability.Evolving
+  public void waitTillReady() throws IOException, TezException {
+    if (!isSession) {
+      // nothing to wait for in non-session mode
+      return;
+    }
+    verifySessionStateForSubmission();
+    while (true) {
+      TezAppMasterStatus status = getAppMasterStatus();
+      if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
+        throw new SessionNotRunning("TezSession has already shutdown");
+      }
+      if (status.equals(TezAppMasterStatus.READY)) {
+        return;
+      }
+      try {
+        Thread.sleep(SLEEP_FOR_READY);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted", e);
+        continue;
+      }
+    }
+  }
+  
+  // for testing
+  protected YarnClient createYarnClient() {
+    return YarnClient.createYarnClient();
+  }
+  
+  // for testing
+  protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) 
+      throws TezException, IOException {
+    return TezClientUtils.getSessionAMProxy(
+        yarnClient, amConfig.getYarnConfiguration(), appId);
   }
 
+  private DAGClientAMProtocolBlockingPB waitForProxy()
+      throws IOException, TezException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long endTime = startTime + (clientTimeout * 1000);
+    DAGClientAMProtocolBlockingPB proxy = null;
+    while (true) {
+      proxy = getSessionAMProxy(sessionAppId);
+      if (proxy != null) {
+        break;
+      }
+      Thread.sleep(100l);
+      if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
+        break;
+      }
+    }
+    return proxy;
+  }
 
-  public DAGClient submitDAGApplication(DAG dag, AMConfiguration amConfig)
+  private void verifySessionStateForSubmission() throws SessionNotRunning {
+    Preconditions.checkState(isSession, "Invalid without session mode");
+    if (!sessionStarted) {
+      throw new SessionNotRunning("Session not started");
+    } else if (sessionStopped) {
+      throw new SessionNotRunning("Session stopped");
+    }
+  }
+  
+  private DAGClient submitDAGApplication(DAG dag)
       throws TezException, IOException {
     ApplicationId appId = createApplication();
-    return submitDAGApplication(appId, dag, amConfig);
+    return submitDAGApplication(appId, dag);
   }
 
   @Private
   // To be used only by YarnRunner
-  public DAGClient submitDAGApplication(ApplicationId appId,
-      DAG dag, AMConfiguration amConfig)
+  public DAGClient submitDAGApplication(ApplicationId appId, DAG dag)
           throws TezException, IOException {
+    LOG.info("Submitting DAG application with id: " + appId);
     try {
       // Use the AMCredentials object in client mode, since this won't be re-used.
       // Ensures we don't fetch credentially unnecessarily if the user has already provided them.
@@ -96,7 +656,8 @@ public class TezClient {
       if (credentials == null) {
         credentials = new Credentials();
       }
-      TezClientUtils.processTezLocalCredentialsFile(credentials, conf);
+      TezClientUtils.processTezLocalCredentialsFile(credentials, 
+          amConfig.getTezConfiguration());
 
       // Add session token for shuffle
       TezClientUtils.createSessionToken(appId.toString(),
@@ -104,25 +665,22 @@ public class TezClient {
 
       // Add credentials for tez-local resources.
       Map<String, LocalResource> tezJarResources = getTezJarResources(credentials);
-      ApplicationSubmissionContext appContext = TezClientUtils.createApplicationSubmissionContext(
-          conf, appId, dag, dag.getName(), amConfig, tezJarResources, credentials);
+      ApplicationSubmissionContext appContext = TezClientUtils
+          .createApplicationSubmissionContext(amConfig.getTezConfiguration(), 
+              appId, dag, dag.getName(), amConfig, tezJarResources, credentials);
       LOG.info("Submitting DAG to YARN"
           + ", applicationId=" + appId
           + ", dagName=" + dag.getName());
+      
       yarnClient.submitApplication(appContext);
+      lastSubmittedAppId = appId;
     } catch (YarnException e) {
       throw new TezException(e);
     }
-    return getDAGClient(appId);
+    return getDAGClient(appId, amConfig.getTezConfiguration());
   }
 
-  /**
-   * Create a new YARN application
-   * @return <code>ApplicationId</code> for the new YARN application
-   * @throws YarnException
-   * @throws IOException
-   */
-  public ApplicationId createApplication() throws TezException, IOException {
+  private ApplicationId createApplication() throws TezException, IOException {
     try {
       return yarnClient.createApplication().
           getNewApplicationResponse().getApplicationId();
@@ -133,17 +691,17 @@ public class TezClient {
 
   private synchronized Map<String, LocalResource> getTezJarResources(Credentials credentials)
       throws IOException {
-    if (tezJarResources == null) {
-      tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf, credentials);
+    if (cachedTezJarResources == null) {
+      cachedTezJarResources = TezClientUtils.setupTezJarsLocalResources(
+          amConfig.getTezConfiguration(), credentials);
     }
-    return tezJarResources;
+    return cachedTezJarResources;
   }
 
-  @Private
-  public DAGClient getDAGClient(ApplicationId appId)
+  @Private // Used only for MapReduce compatibility code
+  public static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf)
       throws IOException, TezException {
-      return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId),
-                                   conf);
+      return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), tezConf);
   }
 
   // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
@@ -159,7 +717,7 @@ public class TezClient {
     }
   };
 
-  String getDefaultTezDAGID(ApplicationId appId) {
+  private static String getDefaultTezDAGID(ApplicationId appId) {
      return (new StringBuilder(DAG)).append(SEPARATOR).
                    append(appId.getClusterTimestamp()).
                    append(SEPARATOR).

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index dd1b997..a0f13d3 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -337,10 +337,10 @@ public class TezClientUtils {
     // Setup resource requirements
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(
-        amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+        amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
             TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
     capability.setVirtualCores(
-        amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
+        amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
             TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
     if (LOG.isDebugEnabled()) {
       LOG.debug("AppMaster capability = " + capability);
@@ -375,14 +375,15 @@ public class TezClientUtils {
     List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
-    String amOpts = amConfig.getAMConf().get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+    String amOpts = amConfig.getTezConfiguration().get(
+        TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
         TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
     amOpts = maybeAddDefaultMemoryJavaOpts(amOpts, capability,
-        amConfig.getAMConf().getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION,
+        amConfig.getTezConfiguration().getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION,
             TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT));
     vargs.add(amOpts);
 
-    String amLogLevel = amConfig.getAMConf().get(
+    String amLogLevel = amConfig.getTezConfiguration().get(
         TezConfiguration.TEZ_AM_LOG_LEVEL,
         TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
     maybeAddDefaultLoggingJavaOpts(amLogLevel, vargs);
@@ -418,14 +419,15 @@ public class TezClientUtils {
     TezYARNUtils.setupDefaultEnv(environment, conf, TezConfiguration.TEZ_AM_LAUNCH_ENV,
         TezConfiguration.TEZ_AM_LAUNCH_ENV_DEFAULT);
     
-    // finally apply env set in the code. This could potentially be removed in TEZ-692
+    // finally apply env set in the code. This could potentially be removed in
+    // TEZ-692
     if (amConfig.getEnv() != null) {
       for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
-        TezYARNUtils.addToEnvironment(environment, entry.getKey(), entry.getValue(),
-            File.pathSeparator);
+        TezYARNUtils.addToEnvironment(environment, entry.getKey(),
+            entry.getValue(), File.pathSeparator);
       }
     }
-
+    
     Map<String, LocalResource> localResources =
         new TreeMap<String, LocalResource>();
 
@@ -437,7 +439,7 @@ public class TezClientUtils {
 
     // emit conf as PB file
     Configuration finalTezConf = createFinalTezConfForApp(conf,
-      amConfig.getAMConf());
+      amConfig.getTezConfiguration());
     
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
@@ -517,7 +519,7 @@ public class TezClientUtils {
             TezConfiguration.TEZ_TASK_LAUNCH_ENV,
             TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT);
 
-        TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getAMConf());
+        TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
       }
 
       // emit protobuf DAG file style
@@ -527,7 +529,7 @@ public class TezClientUtils {
             + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :"
             + sessionJarsPath + " binaryPlanPath :" + binaryPath);
       }
-      amConfig.getAMConf().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
+      amConfig.getTezConfiguration().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
           binaryPath.toUri().toString());
 
       DAGPlan dagPB = dag.createDag(null);
@@ -576,7 +578,7 @@ public class TezClientUtils {
       appContext.setQueue(amConfig.getQueueName());
     }
     appContext.setApplicationName(amName);
-    appContext.setCancelTokensWhenComplete(amConfig.getAMConf().getBoolean(
+    appContext.setCancelTokensWhenComplete(amConfig.getTezConfiguration().getBoolean(
         TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
         TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
     appContext.setAMContainerSpec(amContainer);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
deleted file mode 100644
index d73e9eb..0000000
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.client;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.common.TezYARNUtils;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DAGSubmissionTimedOut;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.SessionNotRunning;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
-
-public class TezSession {
-
-  private static final Log LOG = LogFactory.getLog(TezSession.class);
-
-  private final String sessionName;
-  private ApplicationId applicationId;
-  private final TezSessionConfiguration sessionConfig;
-  private YarnClient yarnClient;
-  private boolean sessionStarted = false;
-  private boolean sessionStopped = false;
-  /** Tokens which will be required for all DAGs submitted to this session. */
-  private Credentials sessionCredentials = new Credentials();
-  private long clientTimeout;
-  private static final long SLEEP_FOR_READY = 500;
-  private JobTokenSecretManager jobTokenSecretManager =
-      new JobTokenSecretManager();
-
-  public TezSession(String sessionName,
-      ApplicationId applicationId,
-      TezSessionConfiguration sessionConfig) {
-    this.sessionName = sessionName;
-    this.sessionConfig = sessionConfig;
-    this.applicationId = applicationId;
-  }
-
-  public TezSession(String sessionName,
-      TezSessionConfiguration sessionConfig) {
-    this(sessionName, null, sessionConfig);
-  }
-
-  /**
-   * Start a Tez Session
-   * @throws TezException
-   * @throws IOException
-   */
-  public synchronized void start() throws TezException, IOException {
-    yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(sessionConfig.getYarnConfiguration());
-    yarnClient.start();
-    
-    TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
-        sessionConfig.getTezConfiguration());
-
-    Map<String, LocalResource> tezJarResources =
-        TezClientUtils.setupTezJarsLocalResources(
-          sessionConfig.getTezConfiguration(), sessionCredentials);
-
-    clientTimeout = sessionConfig.getTezConfiguration().getInt(
-        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
-        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
-
-    if (sessionConfig.getSessionResources() != null
-      && !sessionConfig.getSessionResources().isEmpty()) {
-      tezJarResources.putAll(sessionConfig.getSessionResources());
-    }
-
-    try {
-      if (applicationId == null) {
-        applicationId = yarnClient.createApplication().
-            getNewApplicationResponse().getApplicationId();
-      }
-
-      // Add session token for shuffle
-      TezClientUtils.createSessionToken(applicationId.toString(),
-          jobTokenSecretManager, sessionCredentials);
-
-      ApplicationSubmissionContext appContext =
-          TezClientUtils.createApplicationSubmissionContext(
-              sessionConfig.getTezConfiguration(), applicationId,
-              null, sessionName, sessionConfig.getAMConfiguration(),
-              tezJarResources, sessionCredentials);
-
-      // Set Tez Sessions to not retry on AM crashes if recovery is disabled
-      if (!sessionConfig.getAMConfiguration().getAMConf().getBoolean(
-          TezConfiguration.DAG_RECOVERY_ENABLED,
-          TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
-        appContext.setMaxAppAttempts(1);
-      }
-
-      yarnClient.submitApplication(appContext);
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-    sessionStarted = true;
-  }
-
-  /**
-   * Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
-   * the session or configured timeout period expires. Cleans up session if the
-   * submission timed out.
-   * 
-   * Recommended API.
-   * 
-   * @param dag DAG to be submitted to Session
-   * @return DAGClient to monitor the DAG
-   * @throws TezException
-   * @throws IOException
-   * @throws SessionNotRunning if session is not alive
-   * @throws DAGSubmissionTimedOut if submission timed out
-   */  
-  public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException,
-      InterruptedException {
-    return submitDAG(dag, null);
-  }
-
-  /**
-   * Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
-   * the session or configured timeout period expires. Cleans up session if the
-   * submission timed out.
-   * 
-   * 
-   * 
-   * Allows specification of additional resources which will be added to the
-   * running AMs classpath for execution of this DAG.</p>
-   * Caveats: Resources stack across DAG submissions and are never removed from the classpath. Only
-   * LocalResourceType.FILE is supported. All resources will be treated as private.
-   * 
-   * This method is not recommended unless their is no choice but to add resources to an existing session.
-   * Recommended usage is to setup AM local resources up front via AMConfiguration.
-   * 
-   * @param dag
-   *          DAG to be submitted to Session
-   * @param additionalAmResources
-   *          additional resources which should be localized in the AM while
-   *          executing this DAG.
-   * @return DAGClient to monitor the DAG
-   * @throws TezException
-   * @throws IOException
-   * @throws SessionNotRunning
-   *           if session is not alive
-   * @throws DAGSubmissionTimedOut
-   *           if submission timed out
-   */
-  public synchronized DAGClient submitDAG(DAG dag, Map<String, LocalResource> additionalAmResources)
-    throws TezException, IOException, InterruptedException {
-    verifySessionStateForSubmission();
-
-    String dagId;
-    LOG.info("Submitting dag to TezSession"
-      + ", sessionName=" + sessionName
-      + ", applicationId=" + applicationId
-      + ", dagName=" + dag.getName());
-    
-    if (additionalAmResources != null && !additionalAmResources.isEmpty()) {
-      for (LocalResource lr : additionalAmResources.values()) {
-        Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
-            + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
-      }
-    }
-
-    // Obtain DAG specific credentials.
-    TezClientUtils.setupDAGCredentials(dag, sessionCredentials, sessionConfig.getTezConfiguration());
-
-    // TODO TEZ-1229 - fix jar resources
-    // setup env
-    for (Vertex v : dag.getVertices()) {
-      Map<String, String> taskEnv = v.getTaskEnvironment();
-      TezYARNUtils.setupDefaultEnv(taskEnv, sessionConfig.getTezConfiguration(),
-          TezConfiguration.TEZ_TASK_LAUNCH_ENV, TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT);
-      TezClientUtils.setDefaultLaunchCmdOpts(v, sessionConfig.getTezConfiguration());
-    }
-    
-    DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
-    SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
-    requestBuilder.setDAGPlan(dagPlan).build();
-    if (additionalAmResources != null && !additionalAmResources.isEmpty()) {
-      requestBuilder.setAdditionalAmResources(DagTypeConverters
-          .convertFromLocalResources(additionalAmResources));
-    }
-
-    DAGClientAMProtocolBlockingPB proxy = waitForProxy();
-    if (proxy == null) {
-      try {
-        LOG.warn("DAG submission to session timed out, stopping session");
-        stop();
-      } catch (Throwable t) {
-        LOG.info("Got an exception when trying to stop session", t);
-      }
-      throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
-          + ", timed out after " + clientTimeout + " seconds");
-    }
-
-    try {
-      dagId = proxy.submitDAG(null, requestBuilder.build()).getDagId();
-    } catch (ServiceException e) {
-      throw new TezException(e);
-    }
-    LOG.info("Submitted dag to TezSession"
-        + ", sessionName=" + sessionName
-        + ", applicationId=" + applicationId
-        + ", dagId=" + dagId);
-    return new DAGClientRPCImpl(applicationId, dagId,
-        sessionConfig.getTezConfiguration());
-  }
-
-  /**
-   * Shutdown a Tez Session.
-   * @throws TezException
-   * @throws IOException
-   */
-  public synchronized void stop() throws TezException, IOException {
-    if (!sessionStarted) {
-      LOG.info("Session not started. Ignoring stop command");
-      return;
-    }
-    LOG.info("Shutting down Tez Session"
-        + ", sessionName=" + sessionName
-        + ", applicationId=" + applicationId);
-    sessionStopped = true;
-    try {
-      DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getSessionAMProxy(
-          yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
-      if (proxy != null) {
-        ShutdownSessionRequestProto request =
-            ShutdownSessionRequestProto.newBuilder().build();
-        proxy.shutdownSession(null, request);
-        return;
-      }
-    } catch (TezException e) {
-      LOG.info("Failed to shutdown Tez Session via proxy", e);
-    } catch (ServiceException e) {
-      LOG.info("Failed to shutdown Tez Session via proxy", e);
-    }
-    LOG.info("Could not connect to AM, killing session via YARN"
-        + ", sessionName=" + sessionName
-        + ", applicationId=" + applicationId);
-    try {
-      yarnClient.killApplication(applicationId);
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-  }
-
-  public String getSessionName() {
-    return sessionName;
-  }
-
-  @Private
-  @VisibleForTesting
-  public synchronized ApplicationId getApplicationId() {
-    return applicationId;
-  }
-
-  public TezSessionStatus getSessionStatus() throws TezException, IOException {
-    try {
-      ApplicationReport appReport = yarnClient.getApplicationReport(
-          applicationId);
-      switch (appReport.getYarnApplicationState()) {
-      case NEW:
-      case NEW_SAVING:
-      case ACCEPTED:
-      case SUBMITTED:
-        return TezSessionStatus.INITIALIZING;
-      case FINISHED:
-      case FAILED:
-      case KILLED:
-        return TezSessionStatus.SHUTDOWN;
-      case RUNNING:
-        try {
-          DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getSessionAMProxy(
-              yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
-          if (proxy == null) {
-            return TezSessionStatus.INITIALIZING;
-          }
-          GetAMStatusResponseProto response = proxy.getAMStatus(null,
-              GetAMStatusRequestProto.newBuilder().build());
-          return DagTypeConverters.convertTezSessionStatusFromProto(
-              response.getStatus());
-        } catch (TezException e) {
-          LOG.info("Failed to retrieve AM Status via proxy", e);
-        } catch (ServiceException e) {
-          LOG.info("Failed to retrieve AM Status via proxy", e);
-        }
-      }
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-    return TezSessionStatus.INITIALIZING;
-  }
-
-  /**
-   * Inform the Session to pre-warm containers for upcoming DAGs.
-   * Can be invoked multiple times on the same session.
-   * A subsequent call will release containers that are not compatible with the
-   * new context.
-   * This function can only be invoked if there is no DAG running on the Session
-   * This function can be a no-op if the Session already holds the required
-   * number of containers.
-   * @param context Context for the pre-warm containers.
-   */
-  @Private
-  @InterfaceStability.Unstable
-  public void preWarm(PreWarmContext context)
-      throws IOException, TezException, InterruptedException {
-    verifySessionStateForSubmission();
-
-    try {
-      DAGClientAMProtocolBlockingPB proxy = waitForProxy();
-      if (proxy == null) {
-        throw new SessionNotRunning("Could not connect to Session within client"
-            + " timeout interval, timeoutSecs=" + clientTimeout);
-      }
-
-      String classpath = TezYARNUtils
-        .getFrameworkClasspath(sessionConfig.getYarnConfiguration());
-      Map<String, String> contextEnv = context.getEnvironment();
-      TezYARNUtils.addToEnvironment(contextEnv,
-        ApplicationConstants.Environment.CLASSPATH.name(),
-        classpath, File.pathSeparator);
-
-      DAGClientAMProtocolRPC.PreWarmRequestProto.Builder
-        preWarmReqProtoBuilder =
-          DAGClientAMProtocolRPC.PreWarmRequestProto.newBuilder();
-      preWarmReqProtoBuilder.setPreWarmContext(
-        DagTypeConverters.convertPreWarmContextToProto(context));
-      proxy.preWarm(null, preWarmReqProtoBuilder.build());
-      while (true) {
-        try {
-          Thread.sleep(1000);
-          TezSessionStatus status = getSessionStatus();
-          if (status.equals(TezSessionStatus.READY)) {
-            break;
-          } else if (status.equals(TezSessionStatus.SHUTDOWN)) {
-            throw new SessionNotRunning("Could not connect to Session");
-          }
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-    } catch (ServiceException e) {
-      throw new TezException(e);
-    }
-  }
-  /**
-   * Wait until TEZ session is ready.
-   *
-   * @throws IOException
-   * @throws TezException
-   */
-  @InterfaceStability.Evolving
-  public void waitTillReady() throws IOException, TezException {
-    verifySessionStateForSubmission();
-    while (true) {
-      TezSessionStatus status = getSessionStatus();
-      if (status.equals(TezSessionStatus.SHUTDOWN)) {
-        throw new SessionNotRunning("TezSession has already shutdown");
-      }
-      if (status.equals(TezSessionStatus.READY)) {
-        return;
-      }
-      try {
-        Thread.sleep(SLEEP_FOR_READY);
-      } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted", e);
-        continue;
-      }
-    }
-  }
-
-  private DAGClientAMProtocolBlockingPB waitForProxy()
-      throws IOException, TezException, InterruptedException {
-    long startTime = System.currentTimeMillis();
-    long endTime = startTime + (clientTimeout * 1000);
-    DAGClientAMProtocolBlockingPB proxy = null;
-    while (true) {
-      proxy = TezClientUtils.getSessionAMProxy(yarnClient,
-          sessionConfig.getYarnConfiguration(), applicationId);
-      if (proxy != null) {
-        break;
-      }
-      Thread.sleep(100l);
-      if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
-        break;
-      }
-    }
-    return proxy;
-  }
-
-  private void verifySessionStateForSubmission() throws SessionNotRunning {
-    if (!sessionStarted) {
-      throw new SessionNotRunning("Session not started");
-    } else if (sessionStopped) {
-      throw new SessionNotRunning("Session stopped");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
deleted file mode 100644
index 2ac5b6c..0000000
--- a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.client;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.dag.api.TezConfiguration;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TezSessionConfiguration {
-
-  private final AMConfiguration amConfiguration;
-  private final YarnConfiguration yarnConfig;
-  private final TezConfiguration tezConfig;
-  private final Map<String, LocalResource> sessionResources;
-
-  public TezSessionConfiguration(AMConfiguration amConfiguration,
-      TezConfiguration tezConfig) {
-    this(amConfiguration, tezConfig, new YarnConfiguration(tezConfig));
-  }
-
-  TezSessionConfiguration(AMConfiguration amConfiguration,
-                          TezConfiguration tezConfig,
-                          YarnConfiguration yarnConf) {
-    this(amConfiguration, tezConfig, yarnConf,
-      new TreeMap<String, LocalResource>());
-  }
-
-  /**
-   * TezSessionConfiguration constructor
-   * @param amConfiguration AM Configuration @see AMConfiguration
-   * @param tezConfig Tez Configuration
-   * @param yarnConf Yarn Configuration
-   * @param sessionResources LocalResources accessible to all tasks that are
-   *                         launched within this session.
-   */
-  TezSessionConfiguration(AMConfiguration amConfiguration,
-      TezConfiguration tezConfig,
-      YarnConfiguration yarnConf,
-      Map<String, LocalResource> sessionResources) {
-    this.amConfiguration = amConfiguration;
-    this.tezConfig = tezConfig;
-    this.yarnConfig = yarnConf;
-    this.sessionResources = sessionResources;
-  }
-
-  public AMConfiguration getAMConfiguration() {
-    return amConfiguration;
-  }
-
-  public YarnConfiguration getYarnConfiguration() {
-    return yarnConfig;
-  }
-
-  public TezConfiguration getTezConfiguration() {
-    return tezConfig;
-  }
-
-  public Map<String, LocalResource> getSessionResources() {
-    return Collections.unmodifiableMap(sessionResources);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
deleted file mode 100644
index 3d95482..0000000
--- a/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.client;
-
-public enum TezSessionStatus {
-  /** Session is initializing itself */
-  INITIALIZING,
-  /** Session ready to receive DAG submissions */
-  READY,
-  /** Session is running a DAG */
-  RUNNING,
-  /** Session has shut down or is in the process of shutting down. */
-  SHUTDOWN
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index a7c1031..db434ae 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index cb2c35b..87573f3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
@@ -342,24 +342,24 @@ public class DagTypeConverters {
     return new ProcessorDescriptor(className).setUserPayload(bb);
   }
 
-  public static TezSessionStatus convertTezSessionStatusFromProto(
+  public static TezAppMasterStatus convertTezSessionStatusFromProto(
       TezSessionStatusProto proto) {
     switch (proto) {
     case INITIALIZING:
-      return TezSessionStatus.INITIALIZING;
+      return TezAppMasterStatus.INITIALIZING;
     case READY:
-      return TezSessionStatus.READY;
+      return TezAppMasterStatus.READY;
     case RUNNING:
-      return TezSessionStatus.RUNNING;
+      return TezAppMasterStatus.RUNNING;
     case SHUTDOWN:
-      return TezSessionStatus.SHUTDOWN;
+      return TezAppMasterStatus.SHUTDOWN;
     }
     throw new TezUncheckedException("Could not convert to TezSessionStatus from"
         + " proto");
   }
 
   public static TezSessionStatusProto convertTezSessionStatusToProto(
-      TezSessionStatus status) {
+      TezAppMasterStatus status) {
     switch (status) {
     case INITIALIZING:
       return TezSessionStatusProto.INITIALIZING;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 610f919..86d4e58 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -49,6 +49,10 @@ public class TezConfiguration extends Configuration {
 
   public static final String TEZ_APPLICATION_MASTER_CLASS =
       "org.apache.tez.dag.app.DAGAppMaster";
+  
+  /** Execution mode for submitting DAG's to the AM */
+  public static final String TEZ_AM_SESSION_MODE = TEZ_AM_PREFIX + "mode.session";
+  public static boolean TEZ_AM_SESSION_MODE_DEFAULT = false;
 
   /** Root Logging level passed to the Tez app master.*/
   public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX + "log.level";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
new file mode 100644
index 0000000..9039bf4
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcController;
+
+
+
+public class TestTezClient {
+
+  class TezClientForTest extends TezClient {
+    YarnClient mockYarnClient;
+    DAGClientAMProtocolBlockingPB sessionAmProxy;
+
+    public TezClientForTest(String name, TezConfiguration tezConf,
+        @Nullable Map<String, LocalResource> localResources,
+        @Nullable Credentials credentials) {
+      super(name, tezConf, localResources, credentials);
+    }
+    
+    @Override
+    protected YarnClient createYarnClient() {
+      return mockYarnClient;
+    }
+    
+    @Override
+    protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) 
+        throws TezException, IOException {
+      return sessionAmProxy;
+    }
+  }
+  
+  @Test
+  public void testTezclientApp() throws Exception {
+    testTezClient(false);
+  }
+  
+  @Test
+  public void testTezclientSession() throws Exception {
+    testTezClient(true);
+  }
+  
+  public void testTezClient(boolean isSession) throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
+
+    Map<String, LocalResource> lrs = Maps.newHashMap();
+    String lrName1 = "LR1";
+    lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    
+    TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
+    
+    ApplicationId appId1 = ApplicationId.newInstance(0, 1);
+    YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS);
+    when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1);
+    ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
+
+    DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class, RETURNS_DEEP_STUBS);
+    
+    client.sessionAmProxy = sessionAmProxy;
+    client.mockYarnClient = yarnClient;
+    
+    client.start();
+    verify(yarnClient, times(1)).init((Configuration)any());
+    verify(yarnClient, times(1)).start();
+    if (isSession) {
+      verify(yarnClient, times(1)).submitApplication(captor.capture());
+      ApplicationSubmissionContext context = captor.getValue();
+      Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size());
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          lrName1));
+    } else {
+      verify(yarnClient, times(0)).submitApplication(captor.capture());
+    }
+    
+    DAG dag = new DAG("DAG").addVertex(
+        new Vertex("Vertex", new ProcessorDescriptor("P"), 1, Resource.newInstance(1, 1)));
+    DAGClient dagClient = client.submitDAG(dag);
+    
+    Assert.assertEquals(appId1, dagClient.getApplicationId());
+    
+    if (isSession) {
+      verify(yarnClient, times(1)).submitApplication(captor.capture());
+      verify(sessionAmProxy, times(1)).submitDAG((RpcController)any(), (SubmitDAGRequestProto) any());
+    } else {
+      verify(yarnClient, times(1)).submitApplication(captor.capture());
+      ApplicationSubmissionContext context = captor.getValue();
+      Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size());
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          lrName1));
+    }
+    
+    // add resources
+    String lrName2 = "LR2";
+    lrs.clear();
+    lrs.put(lrName2, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    client.addAppMasterLocalResources(lrs);
+    
+    ApplicationId appId2 = ApplicationId.newInstance(0, 2);
+    when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId2);
+    
+    dag = new DAG("DAG").addVertex(
+        new Vertex("Vertex", new ProcessorDescriptor("P"), 1, Resource.newInstance(1, 1)));
+    dagClient = client.submitDAG(dag);
+    
+    if (isSession) {
+      // same app master
+      verify(yarnClient, times(1)).submitApplication(captor.capture());
+      Assert.assertEquals(appId1, dagClient.getApplicationId());
+      // additional resource is sent
+      ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
+      verify(sessionAmProxy, times(2)).submitDAG((RpcController)any(), captor1.capture());
+      SubmitDAGRequestProto proto = captor1.getValue();
+      Assert.assertEquals(1, proto.getAdditionalAmResources().getLocalResourcesCount());
+      Assert.assertEquals(lrName2, proto.getAdditionalAmResources().getLocalResources(0).getName());
+    } else {
+      // new app master
+      Assert.assertEquals(appId2, dagClient.getApplicationId());
+      verify(yarnClient, times(2)).submitApplication(captor.capture());
+      // additional resource is added
+      ApplicationSubmissionContext context = captor.getValue();
+      Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size());
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          lrName1));
+      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+          lrName2));
+    }
+    
+    
+    
+    client.stop();
+    if (isSession) {
+      verify(sessionAmProxy, times(1)).shutdownSession((RpcController) any(), (ShutdownSessionRequestProto)any());
+    }
+    verify(yarnClient, times(1)).stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index e519187..e983534 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -19,7 +19,6 @@ package org.apache.tez.client;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.FileNotFoundException;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index 9158f52..cfa3413 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -18,12 +18,10 @@
 package org.apache.tez.common;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.PrintStream;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index e1392d1..cbd48fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -10,7 +10,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.DAGAppMaster;
@@ -92,7 +92,7 @@ public class DAGClientHandler {
     }
   }
 
-  public synchronized TezSessionStatus getSessionStatus() throws TezException {
+  public synchronized TezAppMasterStatus getSessionStatus() throws TezException {
     if (!dagAppMaster.isSession()) {
       throw new TezException("Unsupported operation as AM not running in"
           + " session mode");
@@ -100,19 +100,19 @@ public class DAGClientHandler {
     switch (dagAppMaster.getState()) {
     case NEW:
     case INITED:
-      return TezSessionStatus.INITIALIZING;
+      return TezAppMasterStatus.INITIALIZING;
     case IDLE:
-      return TezSessionStatus.READY;
+      return TezAppMasterStatus.READY;
     case RECOVERING:
     case RUNNING:
-      return TezSessionStatus.RUNNING;
+      return TezAppMasterStatus.RUNNING;
     case ERROR:
     case FAILED:
     case SUCCEEDED:
     case KILLED:
-      return TezSessionStatus.SHUTDOWN;
+      return TezAppMasterStatus.SHUTDOWN;
     }
-    return TezSessionStatus.INITIALIZING;
+    return TezAppMasterStatus.INITIALIZING;
   }
 
   public synchronized void preWarmContainers(PreWarmContext preWarmContext)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6bb06d06/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 6f088d4..d47bff0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClientHandler;
@@ -150,7 +150,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
   public GetAMStatusResponseProto getAMStatus(RpcController controller,
       GetAMStatusRequestProto request) throws ServiceException {
     try {
-      TezSessionStatus sessionStatus = real.getSessionStatus();
+      TezAppMasterStatus sessionStatus = real.getSessionStatus();
       return GetAMStatusResponseProto.newBuilder().setStatus(
           DagTypeConverters.convertTezSessionStatusToProto(sessionStatus))
           .build();


Mime
View raw message