hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [12/50] [abbrv] hadoop git commit: YARN-6255. Refactor yarn-native-services framework. Contributed by Jian He
Date Wed, 21 Jun 2017 18:33:34 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68ec5e78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 00e2b62..f4ea70b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -20,7 +20,6 @@ package org.apache.slider.client;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -35,7 +34,6 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
@@ -55,40 +53,44 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
-import org.apache.slider.api.ClusterDescription;
 import org.apache.slider.api.ClusterNode;
 import org.apache.slider.api.SliderApplicationApi;
 import org.apache.slider.api.SliderClusterProtocol;
-import org.apache.slider.api.StateValues;
 import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.resource.Application;
+import org.apache.slider.api.resource.Component;
 import org.apache.slider.api.types.ContainerInformation;
 import org.apache.slider.api.types.NodeInformationList;
-import org.apache.slider.api.types.SliderInstanceDescription;
 import org.apache.slider.client.ipc.SliderApplicationIpcClient;
 import org.apache.slider.client.ipc.SliderClusterOperations;
 import org.apache.slider.common.Constants;
 import org.apache.slider.common.SliderExitCodes;
 import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
 import org.apache.slider.common.params.AbstractActionArgs;
 import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
 import org.apache.slider.common.params.ActionAMSuicideArgs;
 import org.apache.slider.common.params.ActionClientArgs;
 import org.apache.slider.common.params.ActionCreateArgs;
 import org.apache.slider.common.params.ActionDependencyArgs;
-import org.apache.slider.common.params.ActionDestroyArgs;
 import org.apache.slider.common.params.ActionDiagnosticArgs;
 import org.apache.slider.common.params.ActionEchoArgs;
 import org.apache.slider.common.params.ActionExistsArgs;
@@ -113,20 +115,13 @@ import org.apache.slider.common.params.ActionUpgradeArgs;
 import org.apache.slider.common.params.Arguments;
 import org.apache.slider.common.params.ClientArgs;
 import org.apache.slider.common.params.CommonArgs;
-import org.apache.slider.common.params.LaunchArgsAccessor;
 import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.Duration;
 import org.apache.slider.common.tools.SliderFileSystem;
 import org.apache.slider.common.tools.SliderUtils;
 import org.apache.slider.common.tools.SliderVersionInfo;
-import org.apache.slider.core.buildutils.InstanceBuilder;
 import org.apache.slider.core.buildutils.InstanceIO;
 import org.apache.slider.core.conf.AggregateConf;
 import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
-import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
 import org.apache.slider.core.exceptions.BadClusterStateException;
 import org.apache.slider.core.exceptions.BadCommandArgumentsException;
 import org.apache.slider.core.exceptions.BadConfigException;
@@ -137,18 +132,13 @@ import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
 import org.apache.slider.core.exceptions.UsageException;
 import org.apache.slider.core.exceptions.WaitTimeoutException;
-import org.apache.slider.core.launch.AppMasterLauncher;
 import org.apache.slider.core.launch.ClasspathConstructor;
 import org.apache.slider.core.launch.CredentialUtils;
 import org.apache.slider.core.launch.JavaCommandLineBuilder;
-import org.apache.slider.core.launch.LaunchedApplication;
 import org.apache.slider.core.launch.SerializedApplicationReport;
 import org.apache.slider.core.main.RunService;
-import org.apache.slider.core.persist.AppDefinitionPersister;
 import org.apache.slider.core.persist.ApplicationReportSerDeser;
-import org.apache.slider.core.persist.ConfPersister;
 import org.apache.slider.core.persist.JsonSerDeser;
-import org.apache.slider.core.persist.LockAcquireFailedException;
 import org.apache.slider.core.registry.SliderRegistryUtils;
 import org.apache.slider.core.registry.YarnAppListClient;
 import org.apache.slider.core.registry.docstore.ConfigFormat;
@@ -160,19 +150,19 @@ import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 import org.apache.slider.core.registry.retrieve.RegistryRetriever;
 import org.apache.slider.core.zk.BlockingZKWatcher;
 import org.apache.slider.core.zk.ZKIntegration;
-import org.apache.slider.core.zk.ZKPathBuilder;
 import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.ProviderUtils;
 import org.apache.slider.providers.SliderProviderFactory;
 import org.apache.slider.providers.agent.AgentKeys;
-import org.apache.slider.providers.docker.DockerClientProvider;
-import org.apache.slider.providers.slideram.SliderAMClientProvider;
 import org.apache.slider.server.appmaster.SliderAppMaster;
 import org.apache.slider.server.appmaster.rpc.RpcBinder;
 import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.slider.util.ServiceApiUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -182,7 +172,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.Console;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -191,10 +180,7 @@ import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
 import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -213,14 +199,11 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
-import static org.apache.slider.api.InternalKeys.*;
-import static org.apache.slider.api.OptionKeys.*;
-import static org.apache.slider.api.ResourceKeys.*;
+import static org.apache.slider.api.InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH;
 import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
 import static org.apache.slider.common.params.SliderActions.*;
 import static org.apache.slider.common.tools.SliderUtils.*;
 
-
 /**
  * Client service for Slider
  */
@@ -246,6 +229,9 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
   public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided";
   public static final String E_PACKAGE_EXISTS = "Package exists";
   private static PrintStream clientOutputStream = System.out;
+  private static final JsonSerDeser<Application> jsonSerDeser =
+      new JsonSerDeser<Application>(Application.class,
+          PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
 
   // value should not be changed without updating string find in slider.py
   private static final String PASSWORD_PROMPT = "Enter password for";
@@ -362,16 +348,22 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
             serviceArgs.getActionAMSuicideArgs());
         break;
       
-      case ACTION_BUILD:
-        exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
-        break;
-      
       case ACTION_CLIENT:
         exitCode = actionClient(serviceArgs.getActionClientArgs());
         break;
 
       case ACTION_CREATE:
-        exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+        ActionCreateArgs args = serviceArgs.getActionCreateArgs();
+        File file = args.getAppDef();
+        Path filePath = new Path(file.getAbsolutePath());
+        log.info("Loading app definition from: " + filePath);
+        Application application =
+            jsonSerDeser.load(FileSystem.getLocal(getConfig()), filePath);
+        if(args.lifetime > 0) {
+          application.setLifetime(args.lifetime);
+        }
+        application.setName(clusterName);
+        actionCreate(application);
         break;
 
       case ACTION_DEPENDENCY:
@@ -379,7 +371,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
         break;
 
       case ACTION_DESTROY:
-        exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs());
+        actionDestroy(clusterName);
         break;
 
       case ACTION_DIAGNOSTICS:
@@ -392,11 +384,11 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
         break;
       
       case ACTION_FLEX:
-        exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+        actionFlex(clusterName, serviceArgs.getActionFlexArgs());
         break;
       
-      case ACTION_FREEZE:
-        exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
+      case ACTION_STOP:
+        actionStop(clusterName, serviceArgs.getActionFreezeArgs());
         break;
       
       case ACTION_HELP:
@@ -456,8 +448,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
         exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs());
         break;
 
-      case ACTION_THAW:
-        exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+      case ACTION_START:
+        exitCode = actionStart(clusterName, serviceArgs.getActionThawArgs());
         break;
 
       case ACTION_TOKENS:
@@ -516,7 +508,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
     Exception e = null;
     try {
-      Configuration config = getConfig();
       ZKIntegration client = getZkClient(clusterName, user);
       if (client != null) {
         if (client.exists(zkPath)) {
@@ -627,76 +618,31 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
    * force=true by default.
    */
   @Override
-  public int actionDestroy(String clustername) throws YarnException,
-                                                      IOException {
-    ActionDestroyArgs destroyArgs = new ActionDestroyArgs();
-    destroyArgs.force = true;
-    return actionDestroy(clustername, destroyArgs);
-  }
-
-  @Override
-  public int actionDestroy(String clustername,
-      ActionDestroyArgs destroyArgs) throws YarnException, IOException {
-    // verify that a live cluster isn't there
-    validateClusterName(clustername);
-    //no=op, it is now mandatory. 
-    verifyBindingsDefined();
-    verifyNoLiveClusters(clustername, "Destroy");
-    boolean forceDestroy = destroyArgs.force;
-    log.debug("actionDestroy({}, force={})", clustername, forceDestroy);
-
-    // create the directory path
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
-    // delete the directory;
+  public void actionDestroy(String appName)
+      throws YarnException, IOException {
+    validateClusterName(appName);
+    Path appDir = sliderFileSystem.buildClusterDirPath(appName);
     FileSystem fs = sliderFileSystem.getFileSystem();
-    boolean exists = fs.exists(clusterDirectory);
-    if (exists) {
-      log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
-      if (!forceDestroy) {
-        // fail the command if --force is not explicitly specified
-        throw new UsageException("Destroy will permanently delete directories and registries. "
-            + "Reissue this command with the --force option if you want to proceed.");
-      }
-      if (!fs.delete(clusterDirectory, true)) {
-        log.warn("Filesystem returned false from delete() operation");
-      }
-
-      if(!deleteZookeeperNode(clustername)) {
-        log.warn("Unable to perform node cleanup in Zookeeper.");
-      }
-
-      if (fs.exists(clusterDirectory)) {
-        log.warn("Failed to delete {}", clusterDirectory);
+    if (fs.exists(appDir)) {
+      if (fs.delete(appDir, true)) {
+        log.info("Successfully deleted application + " + appName);
+        return;
+      } else {
+        String message =
+            "Failed to delete application + " + appName + " at:  " + appDir;
+        log.info(message);
+        throw new YarnException(message);
       }
-
-    } else {
-      log.debug("Application Instance {} already destroyed", clustername);
-    }
-
-    // rm the registry entry —do not let this block the destroy operations
-    String registryPath = SliderRegistryUtils.registryPathForInstance(
-        clustername);
-    try {
-      getRegistryOperations().delete(registryPath, true);
-    } catch (IOException e) {
-      log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
-    } catch (SliderException e) {
-      log.warn("Error binding to registry {} ", e, e);
     }
-
-    List<ApplicationReport> instances = findAllLiveInstances(clustername);
-    // detect any race leading to cluster creation during the check/destroy process
-    // and report a problem.
-    if (!instances.isEmpty()) {
-      throw new SliderException(EXIT_APPLICATION_IN_USE,
-                              clustername + ": "
-                              + E_DESTROY_CREATE_RACE_CONDITION
-                              + " :" +
-                              instances.get(0));
+    if (!deleteZookeeperNode(appName)) {
+      String message =
+          "Failed to cleanup cleanup application " + appName + " in zookeeper";
+      log.warn(message);
+      throw new YarnException(message);
     }
-    log.info("Destroyed cluster {}", clustername);
-    return EXIT_SUCCESS;
+    //TODO clean registry
   }
+
   
   @Override
   public int actionAmSuicide(String clustername,
@@ -715,203 +661,285 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     return factory.createClientProvider();
   }
 
-  /**
-   * Create the cluster -saving the arguments to a specification file first
-   * @param clustername cluster name
-   * @return the status code
-   * @throws YarnException Yarn problems
-   * @throws IOException other problems
-   * @throws BadCommandArgumentsException bad arguments.
-   */
-  public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
-                                               YarnException,
-                                               IOException {
-
-    actionBuild(clustername, createArgs);
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
-    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-        clustername, clusterDirectory);
-    try {
-      checkForCredentials(getConfig(), instanceDefinition.getAppConf(),
-          clustername);
-    } catch (IOException e) {
-      sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
-      throw e;
-    }
-    return startCluster(clustername, createArgs, createArgs.lifetime);
-  }
-
-  @Override
-  public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
-      throws YarnException, IOException {
-    File template = upgradeArgs.template;
-    File resources = upgradeArgs.resources;
-    List<String> containers = upgradeArgs.containers;
-    List<String> components = upgradeArgs.components;
-
-    // For upgrade spec, let's be little more strict with validation. If either
-    // --template or --resources is specified, then both needs to be specified.
-    // Otherwise the internal app config and resources states of the app will be
-    // unwantedly modified and the change will take effect to the running app
-    // immediately.
-    require(!(template != null && resources == null),
-          "Option %s must be specified with option %s",
-          Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
-
-    require(!(resources != null && template == null),
-          "Option %s must be specified with option %s",
-          Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
-
-    // For upgrade spec, both --template and --resources should be specified
-    // and neither of --containers or --components should be used
-    if (template != null && resources != null) {
-      require(CollectionUtils.isEmpty(containers),
-            "Option %s cannot be specified with %s or %s",
-            Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
-            Arguments.ARG_RESOURCES);
-      require(CollectionUtils.isEmpty(components),
-              "Option %s cannot be specified with %s or %s",
-              Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
-              Arguments.ARG_RESOURCES);
-
-      // not an error to try to upgrade a stopped cluster, just return success
-      // code, appropriate log messages have already been dumped
-      if (!isAppInRunningState(clustername)) {
-        return EXIT_SUCCESS;
-      }
 
-      // Now initiate the upgrade spec flow
-      buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
-      SliderClusterOperations clusterOperations = createClusterOperations(clustername);
-      clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000);
-      return EXIT_SUCCESS;
-    }
+  public ApplicationId actionCreate(Application application)
+      throws IOException, YarnException {
+    ServiceApiUtil.validateApplicationPostPayload(application);
+    String appName = application.getName();
+    validateClusterName(appName);
+    verifyNoLiveApp(appName, "Create");
+    Path appDir = checkAppNotExistOnHdfs(application);
 
-    // Since neither --template or --resources were specified, it is upgrade
-    // containers flow. Here any one or both of --containers and --components
-    // can be specified. If a container is specified with --containers option
-    // and also belongs to a component type specified with --components, it will
-    // be upgraded only once.
-    return actionUpgradeContainers(clustername, upgradeArgs);
+    ApplicationId appId = submitApp(application);
+    application.setId(appId.toString());
+    // write app definition on to hdfs
+    persistApp(appDir, application);
+    return appId;
+    //TODO deal with registry
   }
 
-  private int actionUpgradeContainers(String clustername,
-      ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
-    verifyBindingsDefined();
-    validateClusterName(clustername);
-    int waittime = upgradeArgs.getWaittime(); // ignored for now
-    String text = "Upgrade containers";
-    log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
-        text, waittime);
-
-    // not an error to try to upgrade a stopped cluster, just return success
-    // code, appropriate log messages have already been dumped
-    if (!isAppInRunningState(clustername)) {
-      return EXIT_SUCCESS;
-    }
-
-    // Create sets of containers and components to get rid of duplicates and
-    // for quick lookup during checks below
-    Set<String> containers = new HashSet<>();
-    if (upgradeArgs.containers != null) {
-      containers.addAll(new ArrayList<>(upgradeArgs.containers));
-    }
-    Set<String> components = new HashSet<>();
-    if (upgradeArgs.components != null) {
-      components.addAll(new ArrayList<>(upgradeArgs.components));
-    }
-
-    // check validity of component names and running containers here
-    List<ContainerInformation> liveContainers = getContainers(clustername);
-    Set<String> validContainers = new HashSet<>();
-    Set<String> validComponents = new HashSet<>();
-    for (ContainerInformation liveContainer : liveContainers) {
-      boolean allContainersAndComponentsAccountedFor = true;
-      if (CollectionUtils.isNotEmpty(containers)) {
-        if (containers.contains(liveContainer.containerId)) {
-          containers.remove(liveContainer.containerId);
-          validContainers.add(liveContainer.containerId);
-        }
-        allContainersAndComponentsAccountedFor = false;
-      }
-      if (CollectionUtils.isNotEmpty(components)) {
-        if (components.contains(liveContainer.component)) {
-          components.remove(liveContainer.component);
-          validComponents.add(liveContainer.component);
-        }
-        allContainersAndComponentsAccountedFor = false;
-      }
-      if (allContainersAndComponentsAccountedFor) {
-        break;
+  private ApplicationId submitApp(Application app)
+      throws IOException, YarnException {
+    String appName = app.getName();
+    Configuration conf = getConfig();
+    Path appRootDir = sliderFileSystem.buildClusterDirPath(app.getName());
+    deployedClusterName = appName;
+
+    YarnClientApplication yarnApp =  yarnClient.createApplication();
+    ApplicationSubmissionContext submissionContext =
+        yarnApp.getApplicationSubmissionContext();
+    applicationId = submissionContext.getApplicationId();
+    submissionContext.setKeepContainersAcrossApplicationAttempts(true);
+    if (app.getLifetime() > 0) {
+      Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
+      appTimeout.put(ApplicationTimeoutType.LIFETIME, app.getLifetime());
+      submissionContext.setApplicationTimeouts(appTimeout);
+    }
+    submissionContext.setMaxAppAttempts(conf.getInt(KEY_AM_RESTART_LIMIT, 2));
+
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    // copy local slideram-log4j.properties to hdfs and add to localResources
+    boolean hasSliderAMLog4j =
+        addAMLog4jResource(appName, conf, localResources);
+    // copy jars to hdfs and add to localResources
+    Path tempPath = addJarResource(appName, localResources);
+    // add keytab if in secure env
+    addKeytabResourceIfSecure(sliderFileSystem, localResources, conf, appName);
+    printLocalResources(localResources);
+
+    //TODO SliderAMClientProvider#copyEnvVars
+    //TODO localResource putEnv
+
+    Map<String, String> env = addAMEnv(conf, tempPath);
+
+    // create AM CLI
+    String cmdStr =
+        buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j);
+
+    //TODO set log aggregation context
+    //TODO set retry window
+    submissionContext.setResource(Resource.newInstance(
+        conf.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM), 1));
+    submissionContext.setQueue(conf.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE));
+    submissionContext.setApplicationName(appName);
+    submissionContext.setApplicationType(SliderKeys.APP_TYPE);
+    Set<String> appTags =
+        AbstractClientProvider.createApplicationTags(appName, null, null);
+    if (!appTags.isEmpty()) {
+      submissionContext.setApplicationTags(appTags);
+    }
+    ContainerLaunchContext amLaunchContext =
+        Records.newRecord(ContainerLaunchContext.class);
+    amLaunchContext.setCommands(Collections.singletonList(cmdStr));
+    amLaunchContext.setEnvironment(env);
+    amLaunchContext.setLocalResources(localResources);
+    addCredentialsIfSecure(conf, amLaunchContext);
+    submissionContext.setAMContainerSpec(amLaunchContext);
+    yarnClient.submitApplication(submissionContext);
+    return submissionContext.getApplicationId();
+  }
+
+  private void printLocalResources(Map<String, LocalResource> map) {
+    log.info("Added LocalResource for localization: ");
+    StringBuilder builder = new StringBuilder();
+    for (Map.Entry<String, LocalResource> entry : map.entrySet()) {
+      builder.append(entry.getKey()).append(" -> ")
+          .append(entry.getValue().getResource().getFile())
+          .append(System.lineSeparator());
+    }
+    log.info(builder.toString());
+  }
+
+  private void addCredentialsIfSecure(Configuration conf,
+      ContainerLaunchContext amLaunchContext) throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      // pick up oozie credentials
+      Credentials credentials =
+          CredentialUtils.loadTokensFromEnvironment(System.getenv(), conf);
+      if (credentials == null) {
+        // nothing from oozie, so build up directly
+        credentials = new Credentials(
+            UserGroupInformation.getCurrentUser().getCredentials());
+        CredentialUtils.addRMRenewableFSDelegationTokens(conf,
+            sliderFileSystem.getFileSystem(), credentials);
+      } else {
+        log.info("Using externally supplied credentials to launch AM");
       }
+      amLaunchContext.setTokens(CredentialUtils.marshallCredentials(credentials));
     }
+  }
 
-    // If any item remains in containers or components then they are invalid.
-    // Log warning for them and proceed.
-    if (CollectionUtils.isNotEmpty(containers)) {
-      log.warn("Invalid set of containers provided {}", containers);
-    }
-    if (CollectionUtils.isNotEmpty(components)) {
-      log.warn("Invalid set of components provided {}", components);
+  private String buildCommandLine(String appName, Configuration conf,
+      Path appRootDir, boolean hasSliderAMLog4j) throws BadConfigException {
+    JavaCommandLineBuilder CLI = new JavaCommandLineBuilder();
+    CLI.forceIPv4().headless();
+    //TODO CLI.setJVMHeap
+    //TODO CLI.addJVMOPTS
+    if (hasSliderAMLog4j) {
+      CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
+      CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
     }
-
-    // If not a single valid container or component is specified do not proceed
-    if (CollectionUtils.isEmpty(validContainers)
-        && CollectionUtils.isEmpty(validComponents)) {
-      log.error("Not a single valid container or component specified. Nothing to do.");
-      return EXIT_NOT_FOUND;
+    CLI.add(SliderAppMaster.SERVICE_CLASSNAME);
+    CLI.add(ACTION_CREATE, appName);
+    //TODO debugAM CLI.add(Arguments.ARG_DEBUG)
+    CLI.add(Arguments.ARG_CLUSTER_URI, appRootDir.toUri());
+//    InetSocketAddress rmSchedulerAddress = getRmSchedulerAddress(conf);
+//    String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
+//    CLI.add(Arguments.ARG_RM_ADDR, rmAddr);
+    // pass the registry binding
+    CLI.addConfOptionToCLI(conf, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+        RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+    CLI.addMandatoryConfOption(conf, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+    if(isHadoopClusterSecure(conf)) {
+      //TODO Is this required ??
+      // if the cluster is secure, make sure that
+      // the relevant security settings go over
+      CLI.addConfOption(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
     }
+//    // copy over any/all YARN RM client values, in case the server-side XML conf file
+//    // has the 0.0.0.0 address
+//    CLI.addConfOptions(conf, YarnConfiguration.RM_ADDRESS,
+//        YarnConfiguration.RM_CLUSTER_ID, YarnConfiguration.RM_HOSTNAME,
+//        YarnConfiguration.RM_PRINCIPAL);
 
-    SliderClusterProtocol appMaster = connect(findInstance(clustername));
-    Messages.UpgradeContainersRequestProto r =
-      Messages.UpgradeContainersRequestProto
-              .newBuilder()
-              .setMessage(text)
-              .addAllContainer(validContainers)
-              .addAllComponent(validComponents)
-              .build();
-    appMaster.upgradeContainers(r);
-    log.info("Cluster upgrade issued for -");
-    if (CollectionUtils.isNotEmpty(validContainers)) {
-      log.info(" Containers (total {}): {}", validContainers.size(),
-          validContainers);
-    }
-    if (CollectionUtils.isNotEmpty(validComponents)) {
-      log.info(" Components (total {}): {}", validComponents.size(),
-          validComponents);
-    }
+    // write out the path output
+    CLI.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
+    String cmdStr = CLI.build();
+    log.info("Completed setting up app master command: {}", cmdStr);
+    return cmdStr;
+  }
 
-    return EXIT_SUCCESS;
+  private Map<String, String> addAMEnv(Configuration conf, Path tempPath)
+      throws IOException {
+    Map<String, String> env = new HashMap<String, String>();
+    ClasspathConstructor classpath =
+        buildClasspath(SliderKeys.SUBMITTED_CONF_DIR, "lib",
+            sliderFileSystem, getUsingMiniMRCluster());
+    env.put("CLASSPATH", classpath.buildClasspath());
+    env.put("LANG", "en_US.UTF-8");
+    env.put("LC_ALL", "en_US.UTF-8");
+    env.put("LANGUAGE", "en_US.UTF-8");
+    String jaas = System.getenv(HADOOP_JAAS_DEBUG);
+    if (jaas != null) {
+      env.put(HADOOP_JAAS_DEBUG, jaas);
+    }
+    env.putAll(getAmLaunchEnv(conf));
+    log.info("AM env: \n{}", stringifyMap(env));
+    return env;
+  }
+
+  private Path addJarResource(String appName,
+      Map<String, LocalResource> localResources)
+      throws IOException, SliderException {
+    Path libPath = sliderFileSystem.buildClusterDirPath(appName);
+    ProviderUtils
+        .addProviderJar(localResources, SliderAppMaster.class, SLIDER_JAR,
+            sliderFileSystem, libPath, "lib", false);
+    Path dependencyLibTarGzip = sliderFileSystem.getDependencyTarGzip();
+    if (sliderFileSystem.isFile(dependencyLibTarGzip)) {
+      log.info("Loading lib tar from " + sliderFileSystem.getFileSystem()
+          .getScheme() + ": "  + dependencyLibTarGzip);
+      SliderUtils.putAmTarGzipAndUpdate(localResources, sliderFileSystem);
+    } else {
+      String[] libs = SliderUtils.getLibDirs();
+      log.info("Loading dependencies from local file system: " + Arrays
+          .toString(libs));
+      for (String libDirProp : libs) {
+        ProviderUtils
+            .addAllDependencyJars(localResources, sliderFileSystem, libPath,
+                "lib", libDirProp);
+      }
+    }
+    return libPath;
+  }
+
+  private boolean addAMLog4jResource(String appName, Configuration conf,
+      Map<String, LocalResource> localResources)
+      throws IOException, BadClusterStateException {
+    boolean hasSliderAMLog4j = false;
+    String hadoopConfDir =
+        System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
+    if (hadoopConfDir != null) {
+      File localFile =
+          new File(hadoopConfDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+      if (localFile.exists()) {
+        Path localFilePath = createLocalPath(localFile);
+        Path appDirPath = sliderFileSystem.buildClusterDirPath(appName);
+        Path remoteConfPath =
+            new Path(appDirPath, SliderKeys.SUBMITTED_CONF_DIR);
+        Path remoteFilePath =
+            new Path(remoteConfPath, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+        copy(conf, localFilePath, remoteFilePath);
+        LocalResource localResource = sliderFileSystem
+            .createAmResource(remoteConfPath, LocalResourceType.FILE);
+        localResources.put(localFilePath.getName(), localResource);
+        hasSliderAMLog4j = true;
+      }
+    }
+    return hasSliderAMLog4j;
+  }
+
+  private Path checkAppNotExistOnHdfs(Application application)
+      throws IOException, SliderException {
+    Path appDir = sliderFileSystem.buildClusterDirPath(application.getName());
+    sliderFileSystem.verifyDirectoryNonexistent(
+        new Path(appDir, application.getName() + ".json"));
+    return appDir;
   }
 
-  // returns true if and only if app is in RUNNING state
-  private boolean isAppInRunningState(String clustername) throws YarnException,
-      IOException {
-    // is this actually a known cluster?
-    sliderFileSystem.locateInstanceDefinition(clustername);
-    ApplicationReport app = findInstance(clustername);
-    if (app == null) {
-      // exit early
-      log.info("Cluster {} not running", clustername);
-      return false;
-    }
-    log.debug("App to upgrade was found: {}:\n{}", clustername,
-        new OnDemandReportStringifier(app));
-    if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
-      log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.",
-          clustername, app.getYarnApplicationState(), ACTION_UPDATE);
-      return false;
+  private void persistApp(Path appDir, Application application)
+      throws IOException, SliderException {
+    FsPermission appDirPermission = new FsPermission("777");
+    sliderFileSystem.createWithPermissions(appDir, appDirPermission);
+    Path appJson = new Path(appDir, application.getName() + ".json");
+    jsonSerDeser
+        .save(sliderFileSystem.getFileSystem(), appJson, application, true);
+    log.info(
+        "Persisted application " + application.getName() + " at " + appJson);
+  }
+
+  private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
+      Map<String, LocalResource> localResource, Configuration conf,
+      String appName) throws IOException, BadConfigException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
     }
-
-    // IPC request to upgrade containers is possible if the app is running.
-    if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
-        .ordinal()) {
-      log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
-          + "to be RUNNING.", clustername, app.getYarnApplicationState());
-      return false;
+    String keytabPreInstalledOnHost =
+        conf.get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+    if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
+      String amKeytabName =
+          conf.get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+      String keytabDir = conf.get(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
+      Path keytabPath =
+          fileSystem.buildKeytabPath(keytabDir, amKeytabName, appName);
+      if (fileSystem.getFileSystem().exists(keytabPath)) {
+        LocalResource keytabRes =
+            fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
+        localResource
+            .put(SliderKeys.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
+        log.info("Adding AM keytab on hdfs: " + keytabPath);
+      } else {
+        log.warn("No keytab file was found at {}.", keytabPath);
+        if (conf.getBoolean(KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
+          throw new BadConfigException("No keytab file was found at %s.",
+              keytabPath);
+        } else {
+          log.warn("The AM will be "
+              + "started without a kerberos authenticated identity. "
+              + "The application is therefore not guaranteed to remain "
+              + "operational beyond 24 hours.");
+        }
+      }
     }
+  }
 
-    return true;
+  @Override
+  public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
+      throws YarnException, IOException {
+  //TODO
+    return 0;
   }
 
   protected static void checkForCredentials(Configuration conf,
@@ -952,15 +980,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     }
   }
 
-  private static char[] readOnePassword(String alias) throws IOException {
-    Console console = System.console();
-    if (console == null) {
-      throw new IOException("Unable to input password for " + alias +
-          " because System.console() is null");
-    }
-    return readPassword(alias, console);
-  }
-
   private static char[] readPassword(String alias, Console console)
       throws IOException {
     char[] cred = null;
@@ -987,16 +1006,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
   }
 
   @Override
-  public int actionBuild(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo) throws
-                                               YarnException,
-                                               IOException {
-
-    buildInstanceDefinition(clustername, buildInfo, false, false);
-    return EXIT_SUCCESS; 
-  }
-
-  @Override
   public int actionKeytab(ActionKeytabArgs keytabInfo)
       throws YarnException, IOException {
     if (keytabInfo.install) {
@@ -1527,12 +1536,12 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     if (buildInfo.lifetime > 0) {
       updateLifetime(clustername, buildInfo.lifetime);
     } else {
-      buildInstanceDefinition(clustername, buildInfo, true, true);
+      //TODO upgrade
     }
     return EXIT_SUCCESS;
   }
 
-  public void updateLifetime(String appName, long lifetime)
+  public String updateLifetime(String appName, long lifetime)
       throws YarnException, IOException {
     EnumSet<YarnApplicationState> appStates = EnumSet.range(
         YarnApplicationState.NEW, YarnApplicationState.RUNNING);
@@ -1553,396 +1562,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     log.info("Successfully updated lifetime for an application: appName = "
         + appName + ", appId = " + appId
         + ". New expiry time in ISO8601 format is " + newTimeout);
-  }
-
-  /**
-   * Build up the AggregateConfiguration for an application instance then
-   * persists it
-   * @param clustername name of the cluster
-   * @param buildInfo the arguments needed to build the cluster
-   * @param overwrite true if existing cluster directory can be overwritten
-   * @param liveClusterAllowed true if live cluster can be modified
-   * @throws YarnException
-   * @throws IOException
-   */
-
-  public void buildInstanceDefinition(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
-      boolean liveClusterAllowed) throws YarnException, IOException {
-    buildInstanceDefinition(clustername, buildInfo, overwrite,
-        liveClusterAllowed, false);
-  }
-
-  public void buildInstanceDefinition(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
-      boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
-      IOException {
-    // verify that a live cluster isn't there
-    validateClusterName(clustername);
-    verifyBindingsDefined();
-    if (!liveClusterAllowed) {
-      verifyNoLiveClusters(clustername, "Create");
-    }
-
-    Configuration conf = getConfig();
-    String registryQuorum = lookupZKQuorum();
-
-    Path appconfdir = buildInfo.getConfdir();
-    // Provider
-    String providerName = buildInfo.getProvider();
-    requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
-    log.debug("Provider is {}", providerName);
-    SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
-    AbstractClientProvider provider =
-      createClientProvider(providerName);
-    InstanceBuilder builder =
-      new InstanceBuilder(sliderFileSystem, 
-                          getConfig(),
-                          clustername);
-    
-    AggregateConf instanceDefinition = new AggregateConf();
-    ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
-    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
-    ConfTreeOperations internal = instanceDefinition.getInternalOperations();
-    //initial definition is set by the providers 
-    sliderAM.prepareInstanceConfiguration(instanceDefinition);
-    provider.prepareInstanceConfiguration(instanceDefinition);
-
-    //load in any specified on the command line
-    if (buildInfo.resources != null) {
-      try {
-        resources.mergeFile(buildInfo.resources,
-                            new ResourcesInputPropertiesValidator());
-
-      } catch (IOException e) {
-        throw new BadConfigException(e,
-               "incorrect argument to %s: \"%s\" : %s ", 
-                                     Arguments.ARG_RESOURCES,
-                                     buildInfo.resources,
-                                     e.toString());
-      }
-    }
-    if (buildInfo.template != null) {
-      try {
-        appConf.mergeFile(buildInfo.template,
-                          new TemplateInputPropertiesValidator());
-      } catch (IOException e) {
-        throw new BadConfigException(e,
-                                     "incorrect argument to %s: \"%s\" : %s ",
-                                     Arguments.ARG_TEMPLATE,
-                                     buildInfo.template,
-                                     e.toString());
-      }
-    }
-
-    if (isUpgradeFlow) {
-      ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
-      if (!upgradeInfo.force) {
-        validateClientAndClusterResource(clustername, resources);
-      }
-    }
-
-    //get the command line options
-    ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
-    ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
-
-    appConf.merge(cmdLineAppOptions);
-
-    AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem);
-    appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf);
-
-    // put the role counts into the resources file
-    Map<String, String> argsRoleMap = buildInfo.getComponentMap();
-    for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
-      String count = roleEntry.getValue();
-      String key = roleEntry.getKey();
-      log.info("{} => {}", key, count);
-      resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count);
-    }
-
-    //all CLI role options
-    Map<String, Map<String, String>> appOptionMap =
-      buildInfo.getCompOptionMap();
-    appConf.mergeComponents(appOptionMap);
-
-    //internal picks up core. values only
-    internal.propagateGlobalKeys(appConf, "slider.");
-    internal.propagateGlobalKeys(appConf, "internal.");
-
-    //copy over role. and yarn. values ONLY to the resources
-    if (PROPAGATE_RESOURCE_OPTION) {
-      resources.propagateGlobalKeys(appConf, "component.");
-      resources.propagateGlobalKeys(appConf, "role.");
-      resources.propagateGlobalKeys(appConf, "yarn.");
-      resources.mergeComponentsPrefix(appOptionMap, "component.", true);
-      resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
-      resources.mergeComponentsPrefix(appOptionMap, "role.", true);
-    }
-
-    // resource component args
-    appConf.merge(cmdLineResourceOptions);
-    resources.merge(cmdLineResourceOptions);
-    resources.mergeComponents(buildInfo.getResourceCompOptionMap());
-
-    builder.init(providerName, instanceDefinition);
-    builder.resolve();
-    builder.propagateFilename();
-    builder.propagatePrincipals();
-    builder.setImageDetailsIfAvailable(buildInfo.getImage(),
-                                       buildInfo.getAppHomeDir());
-    builder.setQueue(buildInfo.queue);
-
-    String quorum = buildInfo.getZKhosts();
-    if (isUnset(quorum)) {
-      quorum = registryQuorum;
-    }
-    if (isUnset(quorum)) {
-      throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
-    }
-    ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
-        getUsername(),
-        clustername,
-        registryQuorum,
-        quorum);
-    String zookeeperRoot = buildInfo.getAppZKPath();
-
-    if (isSet(zookeeperRoot)) {
-      zkPaths.setAppPath(zookeeperRoot);
-    } else {
-      String createDefaultZkNode = appConf.getGlobalOptions()
-          .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
-      if (createDefaultZkNode.equals("true")) {
-        String defaultZKPath = createZookeeperNode(clustername, false);
-        log.debug("ZK node created for application instance: {}", defaultZKPath);
-        if (defaultZKPath != null) {
-          zkPaths.setAppPath(defaultZKPath);
-        }
-      } else {
-        // create AppPath if default is being used
-        String defaultZKPath = createZookeeperNode(clustername, true);
-        log.debug("ZK node assigned to application instance: {}", defaultZKPath);
-        zkPaths.setAppPath(defaultZKPath);
-      }
-    }
-
-    builder.addZKBinding(zkPaths);
-
-    //then propagate any package URI
-    if (buildInfo.packageURI != null) {
-      appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
-    }
-
-    propagatePythonExecutable(conf, instanceDefinition);
-
-    // make any substitutions needed at this stage
-    replaceTokens(appConf.getConfTree(), getUsername(), clustername);
-
-    // TODO: Refactor the validation code and persistence code
-    try {
-      persistInstanceDefinition(overwrite, appconfdir, builder);
-      appDefinitionPersister.persistPackages();
-
-    } catch (LockAcquireFailedException e) {
-      log.warn("Failed to get a Lock on {} : {}", builder, e, e);
-      throw new BadClusterStateException("Failed to save " + clustername
-                                         + ": " + e);
-    }
-
-    // providers to validate what there is
-    // TODO: Validation should be done before persistence
-    AggregateConf instanceDescription = builder.getInstanceDescription();
-    validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
-    validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
-  }
-
-  private void validateClientAndClusterResource(String clustername,
-      ConfTreeOperations clientResources) throws BadClusterStateException,
-      SliderException, IOException {
-    log.info("Validating upgrade resource definition with current cluster "
-        + "state (components and instance count)");
-    Map<String, Integer> clientComponentInstances = new HashMap<>();
-    for (String componentName : clientResources.getComponentNames()) {
-      if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
-        clientComponentInstances.put(componentName, clientResources
-            .getComponentOptInt(componentName,
-                COMPONENT_INSTANCES, -1));
-      }
-    }
-
-    AggregateConf clusterConf = null;
-    try {
-      clusterConf = loadPersistedClusterDescription(clustername);
-    } catch (LockAcquireFailedException e) {
-      log.warn("Failed to get a Lock on cluster resource : {}", e, e);
-      throw new BadClusterStateException(
-          "Failed to load client resource definition " + clustername + ": " + e, e);
-    }
-    Map<String, Integer> clusterComponentInstances = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> component : clusterConf
-        .getResources().components.entrySet()) {
-      if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
-        clusterComponentInstances.put(
-            component.getKey(),
-            Integer.decode(component.getValue().get(
-                COMPONENT_INSTANCES)));
-      }
-    }
-
-    // client and cluster should be an exact match
-    Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances
-        .entrySet().iterator();
-    while (clientComponentInstanceIt.hasNext()) {
-      Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next();
-      if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) {
-        // compare instance count now and remove from both maps if they match
-        if (clusterComponentInstances
-            .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry
-            .getValue().intValue()) {
-          clusterComponentInstances.remove(clientComponentInstanceEntry
-              .getKey());
-          clientComponentInstanceIt.remove();
-        }
-      }
-    }
-
-    if (!clientComponentInstances.isEmpty()
-        || !clusterComponentInstances.isEmpty()) {
-      log.error("Mismatch found in upgrade resource definition and cluster "
-          + "resource state");
-      if (!clientComponentInstances.isEmpty()) {
-        log.info("The upgrade resource definitions that do not match are:");
-        for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances
-            .entrySet()) {
-          log.info("    Component Name: {}, Instance count: {}",
-              clientComponentInstanceEntry.getKey(),
-              clientComponentInstanceEntry.getValue());
-        }
-      }
-      if (!clusterComponentInstances.isEmpty()) {
-        log.info("The cluster resources that do not match are:");
-        for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances
-            .entrySet()) {
-          log.info("    Component Name: {}, Instance count: {}",
-              clusterComponentInstanceEntry.getKey(),
-              clusterComponentInstanceEntry.getValue());
-        }
-      }
-      throw new BadConfigException("Resource definition provided for "
-          + "upgrade does not match with that of the currently running "
-          + "cluster.\nIf you are aware of what you are doing, rerun the "
-          + "command with " + Arguments.ARG_FORCE + " option.");
-    }
-  }
-
-  protected void persistInstanceDefinition(boolean overwrite,
-                                         Path appconfdir,
-                                         InstanceBuilder builder)
-      throws IOException, SliderException, LockAcquireFailedException {
-    builder.persist(appconfdir, overwrite);
-  }
-
-  @VisibleForTesting
-  public static void replaceTokens(ConfTree conf,
-      String userName, String clusterName) throws IOException {
-    Map<String,String> newglobal = new HashMap<>();
-    for (Entry<String,String> entry : conf.global.entrySet()) {
-      newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
-          userName, clusterName));
-    }
-    conf.global.putAll(newglobal);
-
-    for (String component : conf.components.keySet()) {
-      Map<String,String> newComponent = new HashMap<>();
-      for (Entry<String,String> entry : conf.components.get(component).entrySet()) {
-        newComponent.put(entry.getKey(), replaceTokens(entry.getValue(),
-            userName, clusterName));
-      }
-      conf.components.get(component).putAll(newComponent);
-    }
-
-    Map<String,List<String>> newcred = new HashMap<>();
-    for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
-      List<String> resultList = new ArrayList<>();
-      for (String v : entry.getValue()) {
-        resultList.add(replaceTokens(v, userName, clusterName));
-      }
-      newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
-          resultList);
-    }
-    conf.credentials.clear();
-    conf.credentials.putAll(newcred);
-  }
-
-  private static String replaceTokens(String s, String userName,
-      String clusterName) throws IOException {
-    return s.replaceAll(Pattern.quote("${USER}"), userName)
-        .replaceAll(Pattern.quote("${USER_NAME}"), userName);
-  }
-
-  public FsPermission getClusterDirectoryPermissions(Configuration conf) {
-    String clusterDirPermsOct =
-      conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
-    return new FsPermission(clusterDirPermsOct);
-  }
-
-  /**
-   * Verify that the Resource Manager is configured (on a non-HA cluster).
-   * with a useful error message
-   * @throws BadCommandArgumentsException the exception raised on an invalid config
-   */
-  public void verifyBindingsDefined() throws BadCommandArgumentsException {
-    InetSocketAddress rmAddr = getRmAddress(getConfig());
-    if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
-     && !isAddressDefined(rmAddr)) {
-      throw new BadCommandArgumentsException(
-        E_NO_RESOURCE_MANAGER
-        + " in the argument "
-        + Arguments.ARG_MANAGER
-        + " or the configuration property "
-        + YarnConfiguration.RM_ADDRESS 
-        + " value :" + rmAddr);
-    }
-  }
-
-  /**
-   * Load and start a cluster specification.
-   * This assumes that all validation of args and cluster state
-   * have already taken place
-   *
-   * @param clustername name of the cluster.
-   * @param launchArgs launch arguments
-   * @param lifetime
-   * @return the exit code
-   * @throws YarnException
-   * @throws IOException
-   */
-  protected int startCluster(String clustername, LaunchArgsAccessor launchArgs,
-      long lifetime) throws YarnException, IOException {
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
-    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-      clustername,
-      clusterDirectory);
-
-    LaunchedApplication launchedApplication =
-      launchApplication(clustername, clusterDirectory, instanceDefinition,
-                        serviceArgs.isDebug(), lifetime);
-
-    if (launchArgs.getOutputFile() != null) {
-      // output file has been requested. Get the app report and serialize it
-      ApplicationReport report =
-          launchedApplication.getApplicationReport();
-      SerializedApplicationReport sar = new SerializedApplicationReport(report);
-      sar.submitTime = System.currentTimeMillis();
-      ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
-      serDeser.save(sar, launchArgs.getOutputFile());
-    }
-    int waittime = launchArgs.getWaittime();
-    if (waittime > 0) {
-      return waitForAppRunning(launchedApplication, waittime, waittime);
-    } else {
-      // no waiting
-      return EXIT_SUCCESS;
-    }
+    return newTimeout;
   }
 
   /**
@@ -1968,415 +1588,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     }
   }
 
-  /**
-   * Load the instance definition. 
-   * @param name cluster name
-   * @param resolved flag to indicate the cluster should be resolved
-   * @return the loaded configuration
-   * @throws IOException IO problems
-   * @throws SliderException slider explicit issues
-   * @throws UnknownApplicationInstanceException if the file is not found
-   */
-    public AggregateConf loadInstanceDefinition(String name,
-        boolean resolved) throws
-        IOException,
-        SliderException {
-
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
-    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-      name,
-      clusterDirectory);
-    if (resolved) {
-      instanceDefinition.resolve();
-    }
-    return instanceDefinition;
-
-  }
-
-  protected AppMasterLauncher setupAppMasterLauncher(String clustername,
-      Path clusterDirectory, AggregateConf instanceDefinition, boolean debugAM,
-      long lifetime)
-    throws YarnException, IOException{
-    deployedClusterName = clustername;
-    validateClusterName(clustername);
-    verifyNoLiveClusters(clustername, "Launch");
-    Configuration config = getConfig();
-    lookupZKQuorum();
-    boolean clusterSecure = isHadoopClusterSecure(config);
-    //create the Slider AM provider -this helps set up the AM
-    SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
-
-    instanceDefinition.resolve();
-    launchedInstanceDefinition = instanceDefinition;
-
-    ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations();
-    MapOperations internalOptions = internalOperations.getGlobalOptions();
-    ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations();
-    ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations();
-    Path generatedConfDirPath =
-      createPathThatMustExist(internalOptions.getMandatoryOption(
-        INTERNAL_GENERATED_CONF_PATH));
-    Path snapshotConfPath =
-      createPathThatMustExist(internalOptions.getMandatoryOption(
-        INTERNAL_SNAPSHOT_CONF_PATH));
-
-
-    // cluster Provider
-    AbstractClientProvider provider = createClientProvider(
-      internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME));
-    if (log.isDebugEnabled()) {
-      log.debug(instanceDefinition.toString());
-    }
-    MapOperations sliderAMResourceComponent =
-      resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
-    MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
-
-    // add the tags if available
-    Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
-        appOperations, clustername);
-
-    Credentials credentials = null;
-    if (clusterSecure) {
-      // pick up oozie credentials
-      credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(),
-          config);
-      if (credentials == null) {
-        // nothing from oozie, so build up directly
-        credentials = new Credentials(
-            UserGroupInformation.getCurrentUser().getCredentials());
-        CredentialUtils.addRMRenewableFSDelegationTokens(config,
-            sliderFileSystem.getFileSystem(),
-            credentials);
-        CredentialUtils.addRMDelegationToken(yarnClient, credentials);
-
-      } else {
-        log.info("Using externally supplied credentials to launch AM");
-      }
-    }
-
-    AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
-        SliderKeys.APP_TYPE,
-        config,
-        sliderFileSystem,
-        yarnClient,
-        clusterSecure,
-        sliderAMResourceComponent,
-        resourceGlobalOptions,
-        applicationTags,
-        credentials);
-
-    ApplicationId appId = amLauncher.getApplicationId();
-    // set the application name;
-    amLauncher.setKeepContainersOverRestarts(true);
-    // set lifetime in submission context;
-    Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
-    if (lifetime > 0) {
-      appTimeout.put(ApplicationTimeoutType.LIFETIME, lifetime);
-    }
-    amLauncher.submissionContext.setApplicationTimeouts(appTimeout);
-    int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
-    amLauncher.setMaxAppAttempts(maxAppAttempts);
-
-    sliderFileSystem.purgeAppInstanceTempFiles(clustername);
-    Path tempPath = sliderFileSystem.createAppInstanceTempPath(
-        clustername,
-        appId.toString() + "/am");
-    String libdir = "lib";
-    Path libPath = new Path(tempPath, libdir);
-    sliderFileSystem.getFileSystem().mkdirs(libPath);
-    log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath);
- 
-    // set local resources for the application master
-    // local files or archives as needed
-    // In this scenario, the jar file for the application master is part of the local resources
-    Map<String, LocalResource> localResources = amLauncher.getLocalResources();
-    
-    // look for the configuration directory named on the command line
-    boolean hasServerLog4jProperties = false;
-    Path remoteConfPath = null;
-    String relativeConfDir = null;
-    String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
-    if (isUnset(confdirProp)) {
-      log.debug("No local configuration directory provided as system property");
-    } else {
-      File confDir = new File(confdirProp);
-      if (!confDir.exists()) {
-        throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
-                                     confDir);
-      }
-      Path localConfDirPath = createLocalPath(confDir);
-      remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
-      log.debug("Slider configuration directory is {}; remote to be {}", 
-          localConfDirPath, remoteConfPath);
-      copyDirectory(config, localConfDirPath, remoteConfPath, null);
-
-      File log4jserver =
-          new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
-      hasServerLog4jProperties = log4jserver.isFile();
-    }
-    if (!hasServerLog4jProperties) {
-      // check for log4j properties in hadoop conf dir
-      String hadoopConfDir = System.getenv(ApplicationConstants.Environment
-          .HADOOP_CONF_DIR.name());
-      if (hadoopConfDir != null) {
-        File localFile = new File(hadoopConfDir, SliderKeys
-            .LOG4J_SERVER_PROP_FILENAME);
-        if (localFile.exists()) {
-          Path localFilePath = createLocalPath(localFile);
-          remoteConfPath = new Path(clusterDirectory,
-              SliderKeys.SUBMITTED_CONF_DIR);
-          Path remoteFilePath = new Path(remoteConfPath, SliderKeys
-              .LOG4J_SERVER_PROP_FILENAME);
-          copy(config, localFilePath, remoteFilePath);
-          hasServerLog4jProperties = true;
-        }
-      }
-    }
-    // the assumption here is that minimr cluster => this is a test run
-    // and the classpath can look after itself
-
-    boolean usingMiniMRCluster = getUsingMiniMRCluster();
-    if (!usingMiniMRCluster) {
-
-      log.debug("Destination is not a MiniYARNCluster -copying full classpath");
-
-      // insert conf dir first
-      if (remoteConfPath != null) {
-        relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
-        Map<String, LocalResource> submittedConfDir =
-          sliderFileSystem.submitDirectory(remoteConfPath,
-                                         relativeConfDir);
-        mergeMaps(localResources, submittedConfDir);
-      }
-    }
-    // build up the configuration 
-    // IMPORTANT: it is only after this call that site configurations
-    // will be valid.
-
-    propagatePrincipals(config, instanceDefinition);
-    // validate security data
-
-/*
-    // turned off until tested
-    SecurityConfiguration securityConfiguration =
-        new SecurityConfiguration(config,
-            instanceDefinition, clustername);
-    
-*/
-    Configuration clientConfExtras = new Configuration(false);
-    // then build up the generated path.
-    FsPermission clusterPerms = getClusterDirectoryPermissions(config);
-    copyDirectory(config, snapshotConfPath, generatedConfDirPath,
-        clusterPerms);
-
-
-    // standard AM resources
-    sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
-                                       config,
-                                       amLauncher,
-                                       instanceDefinition,
-                                       snapshotConfPath,
-                                       generatedConfDirPath,
-                                       clientConfExtras,
-                                       libdir,
-                                       tempPath,
-                                       usingMiniMRCluster);
-    //add provider-specific resources
-    provider.prepareAMAndConfigForLaunch(sliderFileSystem,
-                                         config,
-                                         amLauncher,
-                                         instanceDefinition,
-                                         snapshotConfPath,
-                                         generatedConfDirPath,
-                                         clientConfExtras,
-                                         libdir,
-                                         tempPath,
-                                         usingMiniMRCluster);
-
-    // now that the site config is fully generated, the provider gets
-    // to do a quick review of them.
-    log.debug("Preflight validation of cluster configuration");
-
-
-    sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
-                                                 clustername,
-                                                 config,
-                                                 instanceDefinition,
-                                                 clusterDirectory,
-                                                 generatedConfDirPath,
-                                                 clusterSecure
-                                                );
-
-    provider.preflightValidateClusterConfiguration(sliderFileSystem,
-                                                   clustername,
-                                                   config,
-                                                   instanceDefinition,
-                                                   clusterDirectory,
-                                                   generatedConfDirPath,
-                                                   clusterSecure
-                                                  );
-
-
-    if (!(provider instanceof DockerClientProvider)) {
-      Path imagePath =
-          extractImagePath(sliderFileSystem, internalOptions);
-      if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
-        log.debug("Registered image path {}", imagePath);
-      }
-    }
-
-    // build the environment
-    amLauncher.putEnv(
-      buildEnvMap(sliderAMResourceComponent));
-    ClasspathConstructor classpath = buildClasspath(relativeConfDir,
-        libdir,
-        getConfig(),
-        sliderFileSystem,
-        usingMiniMRCluster);
-    amLauncher.setClasspath(classpath);
-    //add english env
-    amLauncher.setEnv("LANG", "en_US.UTF-8");
-    amLauncher.setEnv("LC_ALL", "en_US.UTF-8");
-    amLauncher.setEnv("LANGUAGE", "en_US.UTF-8");
-    amLauncher.maybeSetEnv(HADOOP_JAAS_DEBUG,
-        System.getenv(HADOOP_JAAS_DEBUG));
-    amLauncher.putEnv(getAmLaunchEnv(config));
-
-    for (Map.Entry<String, String> envs : getSystemEnv().entrySet()) {
-      log.debug("System env {}={}", envs.getKey(), envs.getValue());
-    }
-    if (log.isDebugEnabled()) {
-      log.debug("AM classpath={}", classpath);
-      log.debug("Environment Map:\n{}",
-                stringifyMap(amLauncher.getEnv()));
-      log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
-    }
-
-    // rm address
-
-    InetSocketAddress rmSchedulerAddress;
-    try {
-      rmSchedulerAddress = getRmSchedulerAddress(config);
-    } catch (IllegalArgumentException e) {
-      throw new BadConfigException("%s Address invalid: %s",
-               YarnConfiguration.RM_SCHEDULER_ADDRESS,
-               config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS));
-    }
-    String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
-
-    JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
-    // insert any JVM options);
-    sliderAM.addJVMOptions(instanceDefinition, commandLine);
-    // enable asserts
-    commandLine.enableJavaAssertions();
-    
-    // if the conf dir has a slideram-log4j.properties, switch to that
-    if (hasServerLog4jProperties) {
-      commandLine.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
-      commandLine.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
-    }
-    
-    // add the AM sevice entry point
-    commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);
-
-    // create action and the cluster name
-    commandLine.add(ACTION_CREATE, clustername);
-
-    // debug
-    if (debugAM) {
-      commandLine.add(Arguments.ARG_DEBUG);
-    }
-
-    // set the cluster directory path
-    commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());
-
-    if (!isUnset(rmAddr)) {
-      commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
-    }
-
-    if (serviceArgs.getFilesystemBinding() != null) {
-      commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding());
-    }
-
-    // pass the registry binding
-    commandLine.addConfOptionToCLI(config, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
-        RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
-    commandLine.addMandatoryConfOption(config, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
-
-    if (clusterSecure) {
-      // if the cluster is secure, make sure that
-      // the relevant security settings go over
-      commandLine.addConfOption(config, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
-    }
-
-    // copy over any/all YARN RM client values, in case the server-side XML conf file
-    // has the 0.0.0.0 address
-    commandLine.addConfOptions(config,
-        YarnConfiguration.RM_ADDRESS,
-        YarnConfiguration.RM_CLUSTER_ID,
-        YarnConfiguration.RM_HOSTNAME,
-        YarnConfiguration.RM_PRINCIPAL);
-
-    // write out the path output
-    commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
-
-    String cmdStr = commandLine.build();
-    log.debug("Completed setting up app master command {}", cmdStr);
-
-    amLauncher.addCommandLine(commandLine);
-
-    // the Slider AM gets to configure the AM requirements, not the custom provider
-    sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
-        amLauncher.getResource());
-
-
-    // Set the priority for the application master
-    amLauncher.setPriority(config.getInt(KEY_YARN_QUEUE_PRIORITY,
-                                   DEFAULT_YARN_QUEUE_PRIORITY));
-
-    // Set the queue to which this application is to be submitted in the RM
-    // Queue for App master
-    String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
-    String suppliedQueue = internalOperations.getGlobalOptions().get(INTERNAL_QUEUE);
-    if(!isUnset(suppliedQueue)) {
-      amQueue = suppliedQueue;
-      log.info("Using queue {} for the application instance.", amQueue);
-    }
-
-    if (isSet(amQueue)) {
-      amLauncher.setQueue(amQueue);
-    }
-    return amLauncher;
-  }
-
-  /**
-   *
-   * @param clustername name of the cluster
-   * @param clusterDirectory cluster dir
-   * @param instanceDefinition the instance definition
-   * @param debugAM enable debug AM options
-   * @param lifetime
-   * @return the launched application
-   * @throws YarnException
-   * @throws IOException
-   */
-  public LaunchedApplication launchApplication(String clustername, Path clusterDirectory,
-      AggregateConf instanceDefinition, boolean debugAM, long lifetime)
-    throws YarnException, IOException {
-
-    AppMasterLauncher amLauncher = setupAppMasterLauncher(clustername,
-        clusterDirectory,
-        instanceDefinition,
-        debugAM, lifetime);
-
-    applicationId = amLauncher.getApplicationId();
-    log.info("Submitting application {}", applicationId);
-
-    // submit the application
-    LaunchedApplication launchedApplication = amLauncher.submitApplication();
-    return launchedApplication;
-  }
-
   protected Map<String, String> getAmLaunchEnv(Configuration config) {
     String sliderAmLaunchEnv = config.get(KEY_AM_LAUNCH_ENV);
     log.debug("{} = {}", KEY_AM_LAUNCH_ENV, sliderAmLaunchEnv);
@@ -2431,95 +1642,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     return placeholderKeyValueMap;
   }
 
-  private void propagatePythonExecutable(Configuration config,
-                                         AggregateConf instanceDefinition) {
-    String pythonExec = config.get(
-        PYTHON_EXECUTABLE_PATH);
-    if (pythonExec != null) {
-      instanceDefinition.getAppConfOperations().getGlobalOptions().putIfUnset(
-          PYTHON_EXECUTABLE_PATH,
-          pythonExec);
-    }
-  }
-
-
-  /**
-   * Wait for the launched app to be accepted in the time  
-   * and, optionally running.
-   * <p>
-   * If the application
-   *
-   * @param launchedApplication application
-   * @param acceptWaitMillis time in millis to wait for accept
-   * @param runWaitMillis time in millis to wait for the app to be running.
-   * May be null, in which case no wait takes place
-   * @return exit code: success
-   * @throws YarnException
-   * @throws IOException
-   */
-  public int waitForAppRunning(LaunchedApplication launchedApplication,
-      int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException {
-    assert launchedApplication != null;
-    int exitCode;
-    // wait for the submit state to be reached
-    ApplicationReport report = launchedApplication.monitorAppToState(
-      YarnApplicationState.ACCEPTED,
-      new Duration(acceptWaitMillis));
-
-    // may have failed, so check that
-    if (hasAppFinished(report)) {
-      exitCode = buildExitCode(report);
-    } else {
-      // exit unless there is a wait
-
-
-      if (runWaitMillis != 0) {
-        // waiting for state to change
-        Duration duration = new Duration(runWaitMillis * 1000);
-        duration.start();
-        report = launchedApplication.monitorAppToState(
-          YarnApplicationState.RUNNING, duration);
-        if (report != null &&
-            report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
-          exitCode = EXIT_SUCCESS;
-        } else {
-          exitCode = buildExitCode(report);
-        }
-      } else {
-        exitCode = EXIT_SUCCESS;
-      }
-    }
-    return exitCode;
-  }
-
-
-  /**
-   * Propagate any critical principals from the current site config down to the HBase one.
-   * @param config config to read from
-   * @param clusterSpec cluster spec
-   */
-  private void propagatePrincipals(Configuration config,
-                                   AggregateConf clusterSpec) {
-    String dfsPrincipal = config.get(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
-    if (dfsPrincipal != null) {
-      String siteDfsPrincipal = SITE_XML_PREFIX + DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
-      clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
-        siteDfsPrincipal,
-        dfsPrincipal);
-    }
-  }
-
-  /**
-   * Create a path that must exist in the cluster fs
-   * @param uri uri to create
-   * @return the path
-   * @throws FileNotFoundException if the path does not exist
-   */
-  public Path createPathThatMustExist(String uri) throws
-      SliderException, IOException {
-    return sliderFileSystem.createPathThatMustExist(uri);
-  }
-
   /**
    * verify that a live cluster isn't there
    * @param clustername cluster name
@@ -2527,7 +1649,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
    * @throws SliderException with exit code EXIT_CLUSTER_LIVE
    * if a cluster of that name is either live or starting up.
    */
-  public void verifyNoLiveClusters(String clustername, String action) throws
+  public void verifyNoLiveApp(String clustername, String action) throws
                                                        IOException,
                                                        YarnException {
     List<ApplicationReport> existing = findAllLiveInstances(clustername);
@@ -2554,11 +1676,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     return deployedClusterName;
   }
 
-  @VisibleForTesting
-  public void setDeployedClusterName(String deployedClusterName) {
-    this.deployedClusterName = deployedClusterName;
-  }
-
   /**
    * ask if the client is using a mini MR cluster
    * @return true if they are
@@ -2568,109 +1685,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
         false);
   }
 
-  /**
-   * Get the application name used in the zookeeper root paths
-   * @return an application-specific path in ZK
-   */
-  private String getAppName() {
-    return "slider";
-  }
-
-  /**
-   * Wait for the app to start running (or go past that state)
-   * @param duration time to wait
-   * @return the app report; null if the duration turned out
-   * @throws YarnException YARN or app issues
-   * @throws IOException IO problems
-   */
-  @VisibleForTesting
-  public ApplicationReport monitorAppToRunning(Duration duration)
-    throws YarnException, IOException {
-    return monitorAppToState(YarnApplicationState.RUNNING, duration);
-  }
-
-  /**
-   * Build an exit code for an application from its report.
-   * If the report parameter is null, its interpreted as a timeout
-   * @param report report application report
-   * @return the exit code
-   * @throws IOException
-   * @throws YarnException
-   */
-  private int buildExitCode(ApplicationReport report) throws
-                                                      IOException,
-                                                      YarnException {
-    if (null == report) {
-      return EXIT_TIMED_OUT;
-    }
-
-    YarnApplicationState state = report.getYarnApplicationState();
-    FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
-    switch (state) {
-      case FINISHED:
-        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
-          log.info("Application has completed successfully");
-          return EXIT_SUCCESS;
-        } else {
-          log.info("Application finished unsuccessfully." +
-                   "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop",
-                   state, dsStatus);
-          return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
-        }
-
-      case KILLED:
-        log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
-                 state, dsStatus);
-        return EXIT_YARN_SERVICE_KILLED;
-
-      case FAILED:
-        log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
-                 dsStatus);
-        return EXIT_YARN_SERVICE_FAILED;
-
-      default:
-        //not in any of these states
-        return EXIT_SUCCESS;
-    }
-  }
-
-  /**
-   * Monitor the submitted application for reaching the requested state.
-   * Will also report if the app reaches a later state (failed, killed, etc)
-   * Kill application if duration!= null & time expires. 
-   * Prerequisite: the applicatin was launched.
-   * @param desiredState desired state.
-   * @param duration how long to wait -must be more than 0
-   * @return the application report -null on a timeout
-   * @throws YarnException
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public ApplicationReport monitorAppToState(
-    YarnApplicationState desiredState,
-    Duration duration)
-    throws YarnException, IOException {
-    LaunchedApplication launchedApplication =
-      new LaunchedApplication(applicationId, yarnClient);
-    return launchedApplication.monitorAppToState(desiredState, duration);
-  }
-
-  @Override
-  public ApplicationReport getApplicationReport() throws
-                                                  IOException,
-                                                  YarnException {
-    return getApplicationReport(applicationId);
-  }
-
-  @Override
-  public boolean forceKillApplication(String reason)
-    throws YarnException, IOException {
-    if (applicationId != null) {
-      new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
-      return true;
-    }
-    return false;
-  }
 
   /**
    * List Slider instances belonging to a specific user with a specific app
@@ -2721,23 +1735,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
   }
 
   /**
-   * Retrieve a list of all live instances. If clustername is supplied then it
-   * returns this specific cluster, if and only if it exists and is live.
-   * 
-   * @param clustername
-   *          cluster name (if looking for a specific live cluster)
-   * @return the list of application names which satisfies the list criteria
-   * @throws IOException
-   * @throws YarnException
-   */
-  public Set<ApplicationReport> getApplicationList(String clustername)
-      throws IOException, YarnException {
-    ActionListArgs args = new ActionListArgs();
-    args.live = true;
-    return getApplicationList(clustername, args);
-  }
-
-  /**
    * Retrieve a list of application instances satisfying the query criteria.
    * 
    * @param clustername
@@ -2757,8 +1754,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
       // the above call throws an exception so the return is not really required
       return Collections.emptySet();
     }
-    verifyBindingsDefined();
-
     boolean live = args.live;
     String state = args.state;
     boolean listContainers = args.containers;
@@ -2868,29 +1863,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     }
   }
 
-  /**
-   * Enumerate slider instances for the current user, and the
-   * most recent app report, where available.
-   * @param listOnlyInState boolean to indicate that the instances should
-   * only include those in a YARN state
-   * <code> minAppState &lt;= currentState &lt;= maxAppState </code>
-   *
-   * @param minAppState minimum application state to include in enumeration.
-   * @param maxAppState maximum application state to include
-   * @return a map of application instance name to description
-   * @throws IOException Any IO problem
-   * @throws YarnException YARN problems
-   */
-  @Override
-  public Map<String, SliderInstanceDescription> enumSliderInstances(
-      boolean listOnlyInState,
-      YarnApplicationState minAppState,
-      YarnApplicationState maxAppState)
-      throws IOException, YarnException {
-    return yarnAppListClient.enumSliderInstances(listOnlyInState,
-        minAppState,
-        maxAppState);
-  }
 
   /**
    * Extract the state of a Yarn application --state argument
@@ -2928,22 +1900,16 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
 
   @Override
   @VisibleForTesting
-  public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
-    validateClusterName(name);
-    Map<String, String> roleMap = args.getComponentMap();
-    // throw usage exception if no changes proposed
-    if (roleMap.size() == 0) {
-      actionHelp(ACTION_FLEX);
-    }
-    verifyBindingsDefined();
-    log.debug("actionFlex({})", name);
-    Map<String, String> roleInstances = new HashMap<>();
-    for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
-      String key = roleEntry.getKey();
-      String val = roleEntry.getValue();
-      roleInstances.put(key, val);
-    }
-    return flex(name, roleInstances);
+  public void actionFlex(String appName, ActionFlexArgs args)
+      throws YarnException, IOException {
+    Component component = new Component();
+    component.setNumberOfContainers(args.getNumberOfContainers());
+    if (StringUtils.isEmpty(args.getComponent())) {
+      component.setName("DEFAULT");
+    } else {
+      component.setName(args.getComponent());
+    }
+    flex(appName, component);
   }
 
   @Override
@@ -2954,7 +1920,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
   }
 
   public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException {
-    verifyBindingsDefined();
     validateClusterName(name);
     boolean checkLive = args.live;
     log.debug("actionExists({}, {}, {})", name, checkLive, args.state);
@@ -3050,14 +2015,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
   }
 
   /**
-   * Get at the service registry operations
-   * @return registry client -valid after the service is inited.
-   */
-  public YarnAppListClient getYarnAppListClient() {
-    return yarnAppListClient;
-  }
-
-  /**
    * Find an instance of an application belonging to the current user.
    * @param appname application name
    * @return the app report or null if none is found
@@ -3128,20 +2085,20 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
       return EXIT_SUCCESS;
     }
 
-    ClusterDescription status = verifyAndGetClusterDescription(clustername);
+    Application application = getApplication(clustername);
     String outfile = statusArgs.getOutput();
     if (outfile == null) {
-      log.info(status.toJsonString());
+      log.info(application.toString());
     } else {
-      status.save(new File(outfile).getAbsoluteFile());
+      jsonSerDeser.save(application, new File(statusArgs.getOutput()));
     }
     return EXIT_SUCCESS;
   }
 
   @Override
-  public String actionStatus(String clustername)
+  public Application actionStatus(String clustername)
       throws YarnException, IOException {
-    return verifyAndGetClusterDescription(clustername).toJsonString();
+    return getApplication(clustername);
   }
 
   private void queryAndPrintLifetime(String appName)
@@ -3170,13 +2127,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     }
   }
 
-  private ClusterDescription verifyAndGetClusterDescription(String clustername)
-      throws YarnException, IOException {
-    verifyBindingsDefined();
-    validateClusterName(clustername);
-    return getClusterDescription(clustername);
-  }
-
   @Override
   public int actionVersion() {
     SliderVersionInfo.loadAndPrintVersionInfo(log);
@@ -3184,269 +2134,106 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
   }
 
   @Override
-  public int actionFreeze(String clustername,
-      ActionFreezeArgs freezeArgs) throws YarnException, IOException {
-    verifyBindingsDefined();
-    validateClusterName(clustername);
-    int waittime = freezeArgs.getWaittime();
-    String text = freezeArgs.message;
-    boolean forcekill = freezeArgs.force;
-    log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
-              text,
-              waittime,
-              forcekill);
-    
-    //is this actually a known cluster?
-    sliderFileSystem.locateInstanceDefinition(clustername);
-    ApplicationReport app = findInstance(clustername);
+  public void actionStop(String appName, ActionFreezeArgs freezeArgs)
+      throws YarnException, IOException {
+    validateClusterName(appName);
+    ApplicationReport app = findInstance(appName);
     if (app == null) {
-      // exit early
-      log.info("Cluster {} not running", clustername);
-      // not an error to stop a stopped cluster
-      return EXIT_SUCCESS;
-    }
-    log.debug("App to stop was found: {}:\n{}", clustername,
-              new OnDemandReportStringifier(app));
-    if (app.getYarnApplicationState().ordinal() >=
-        YarnApplicationState.FINISHED.ordinal()) {
-      log.info("Cluster {} is in a terminated state {}", clustername,
-               app.getYarnApplicationState());
-      return EXIT_SUCCESS;
+      throw new ApplicationNotFoundException(
+          "Application " + appName + " doesn't exist in RM.");
     }
 
-    // IPC request for a managed shutdown is only possible if the app is running.
-    // so we need to force kill if the app is accepted or submitted
-    if (!forcekill
-        && app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) {
-      log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername,
+    if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED
+        .ordinal()) {
+      log.info("Application {} is in a terminated state {}", appName,
           app.getYarnApplicationState());
-      forcekill = true;
-    }
-
-    LaunchedApplication application = new LaunchedApplication(yarnClient, ap

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message