Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AEC0E200CBD for ; Wed, 21 Jun 2017 20:33:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ACA0C160BF3; Wed, 21 Jun 2017 18:33:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 77D2D160BF0 for ; Wed, 21 Jun 2017 20:33:30 +0200 (CEST) Received: (qmail 41668 invoked by uid 500); 21 Jun 2017 18:33:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 40166 invoked by uid 99); 21 Jun 2017 18:33:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jun 2017 18:33:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B32DE8F02; Wed, 21 Jun 2017 18:33:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Wed, 21 Jun 2017 18:33:34 -0000 Message-Id: <453792cce6a047cea38f56ac12887c54@git.apache.org> In-Reply-To: <00450cda78594bd9a043fcb379bcf7c3@git.apache.org> References: <00450cda78594bd9a043fcb379bcf7c3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] hadoop git commit: YARN-6255. Refactor yarn-native-services framework. Contributed by Jian He archived-at: Wed, 21 Jun 2017 18:33:33 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/68ec5e78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java index 00e2b62..f4ea70b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -20,7 +20,6 @@ package org.apache.slider.client; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; @@ -35,7 +34,6 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; @@ -55,40 +53,44 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeout; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; -import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterNode; import org.apache.slider.api.SliderApplicationApi; import org.apache.slider.api.SliderClusterProtocol; -import org.apache.slider.api.StateValues; import org.apache.slider.api.proto.Messages; +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.Component; import org.apache.slider.api.types.ContainerInformation; import org.apache.slider.api.types.NodeInformationList; -import org.apache.slider.api.types.SliderInstanceDescription; import org.apache.slider.client.ipc.SliderApplicationIpcClient; import org.apache.slider.client.ipc.SliderClusterOperations; import org.apache.slider.common.Constants; import org.apache.slider.common.SliderExitCodes; import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.SliderXmlConfKeys; import org.apache.slider.common.params.AbstractActionArgs; import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; import org.apache.slider.common.params.ActionAMSuicideArgs; import org.apache.slider.common.params.ActionClientArgs; import org.apache.slider.common.params.ActionCreateArgs; import org.apache.slider.common.params.ActionDependencyArgs; -import org.apache.slider.common.params.ActionDestroyArgs; import org.apache.slider.common.params.ActionDiagnosticArgs; import org.apache.slider.common.params.ActionEchoArgs; import org.apache.slider.common.params.ActionExistsArgs; @@ -113,20 +115,13 @@ import org.apache.slider.common.params.ActionUpgradeArgs; import org.apache.slider.common.params.Arguments; import org.apache.slider.common.params.ClientArgs; import org.apache.slider.common.params.CommonArgs; -import org.apache.slider.common.params.LaunchArgsAccessor; import org.apache.slider.common.tools.ConfigHelper; -import org.apache.slider.common.tools.Duration; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.common.tools.SliderVersionInfo; -import org.apache.slider.core.buildutils.InstanceBuilder; import org.apache.slider.core.buildutils.InstanceIO; import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.core.conf.ConfTree; -import org.apache.slider.core.conf.ConfTreeOperations; -import org.apache.slider.core.conf.MapOperations; -import org.apache.slider.core.conf.ResourcesInputPropertiesValidator; -import org.apache.slider.core.conf.TemplateInputPropertiesValidator; import org.apache.slider.core.exceptions.BadClusterStateException; import org.apache.slider.core.exceptions.BadCommandArgumentsException; import org.apache.slider.core.exceptions.BadConfigException; @@ -137,18 +132,13 @@ import org.apache.slider.core.exceptions.SliderException; import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; import org.apache.slider.core.exceptions.UsageException; import org.apache.slider.core.exceptions.WaitTimeoutException; -import org.apache.slider.core.launch.AppMasterLauncher; import org.apache.slider.core.launch.ClasspathConstructor; import org.apache.slider.core.launch.CredentialUtils; import org.apache.slider.core.launch.JavaCommandLineBuilder; -import org.apache.slider.core.launch.LaunchedApplication; import org.apache.slider.core.launch.SerializedApplicationReport; import org.apache.slider.core.main.RunService; -import org.apache.slider.core.persist.AppDefinitionPersister; import org.apache.slider.core.persist.ApplicationReportSerDeser; -import org.apache.slider.core.persist.ConfPersister; import org.apache.slider.core.persist.JsonSerDeser; -import org.apache.slider.core.persist.LockAcquireFailedException; import org.apache.slider.core.registry.SliderRegistryUtils; import org.apache.slider.core.registry.YarnAppListClient; import org.apache.slider.core.registry.docstore.ConfigFormat; @@ -160,19 +150,19 @@ import org.apache.slider.core.registry.docstore.PublishedExportsSet; import org.apache.slider.core.registry.retrieve.RegistryRetriever; import org.apache.slider.core.zk.BlockingZKWatcher; import org.apache.slider.core.zk.ZKIntegration; -import org.apache.slider.core.zk.ZKPathBuilder; import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.ProviderUtils; import org.apache.slider.providers.SliderProviderFactory; import org.apache.slider.providers.agent.AgentKeys; -import org.apache.slider.providers.docker.DockerClientProvider; -import org.apache.slider.providers.slideram.SliderAMClientProvider; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.rpc.RpcBinder; import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; +import org.apache.slider.util.ServiceApiUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; +import org.codehaus.jackson.map.PropertyNamingStrategy; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; @@ -182,7 +172,6 @@ import java.io.ByteArrayOutputStream; import java.io.Console; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; @@ -191,10 +180,7 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringWriter; import java.io.Writer; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -213,14 +199,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.apache.hadoop.registry.client.binding.RegistryUtils.*; -import static org.apache.slider.api.InternalKeys.*; -import static org.apache.slider.api.OptionKeys.*; -import static org.apache.slider.api.ResourceKeys.*; +import static org.apache.slider.api.InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH; import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG; import static org.apache.slider.common.params.SliderActions.*; import static org.apache.slider.common.tools.SliderUtils.*; - /** * Client service for Slider */ @@ -246,6 +229,9 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided"; public static final String E_PACKAGE_EXISTS = "Package exists"; private static PrintStream clientOutputStream = System.out; + private static final JsonSerDeser jsonSerDeser = + new JsonSerDeser(Application.class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); // value should not be changed without updating string find in slider.py private static final String PASSWORD_PROMPT = "Enter password for"; @@ -362,16 +348,22 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe serviceArgs.getActionAMSuicideArgs()); break; - case ACTION_BUILD: - exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); - break; - case ACTION_CLIENT: exitCode = actionClient(serviceArgs.getActionClientArgs()); break; case ACTION_CREATE: - exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); + ActionCreateArgs args = serviceArgs.getActionCreateArgs(); + File file = args.getAppDef(); + Path filePath = new Path(file.getAbsolutePath()); + log.info("Loading app definition from: " + filePath); + Application application = + jsonSerDeser.load(FileSystem.getLocal(getConfig()), filePath); + if(args.lifetime > 0) { + application.setLifetime(args.lifetime); + } + application.setName(clusterName); + actionCreate(application); break; case ACTION_DEPENDENCY: @@ -379,7 +371,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe break; case ACTION_DESTROY: - exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs()); + actionDestroy(clusterName); break; case ACTION_DIAGNOSTICS: @@ -392,11 +384,11 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe break; case ACTION_FLEX: - exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); + actionFlex(clusterName, serviceArgs.getActionFlexArgs()); break; - case ACTION_FREEZE: - exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs()); + case ACTION_STOP: + actionStop(clusterName, serviceArgs.getActionFreezeArgs()); break; case ACTION_HELP: @@ -456,8 +448,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs()); break; - case ACTION_THAW: - exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); + case ACTION_START: + exitCode = actionStart(clusterName, serviceArgs.getActionThawArgs()); break; case ACTION_TOKENS: @@ -516,7 +508,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe String zkPath = ZKIntegration.mkClusterPath(user, clusterName); Exception e = null; try { - Configuration config = getConfig(); ZKIntegration client = getZkClient(clusterName, user); if (client != null) { if (client.exists(zkPath)) { @@ -627,76 +618,31 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe * force=true by default. */ @Override - public int actionDestroy(String clustername) throws YarnException, - IOException { - ActionDestroyArgs destroyArgs = new ActionDestroyArgs(); - destroyArgs.force = true; - return actionDestroy(clustername, destroyArgs); - } - - @Override - public int actionDestroy(String clustername, - ActionDestroyArgs destroyArgs) throws YarnException, IOException { - // verify that a live cluster isn't there - validateClusterName(clustername); - //no=op, it is now mandatory. - verifyBindingsDefined(); - verifyNoLiveClusters(clustername, "Destroy"); - boolean forceDestroy = destroyArgs.force; - log.debug("actionDestroy({}, force={})", clustername, forceDestroy); - - // create the directory path - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); - // delete the directory; + public void actionDestroy(String appName) + throws YarnException, IOException { + validateClusterName(appName); + Path appDir = sliderFileSystem.buildClusterDirPath(appName); FileSystem fs = sliderFileSystem.getFileSystem(); - boolean exists = fs.exists(clusterDirectory); - if (exists) { - log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory); - if (!forceDestroy) { - // fail the command if --force is not explicitly specified - throw new UsageException("Destroy will permanently delete directories and registries. " - + "Reissue this command with the --force option if you want to proceed."); - } - if (!fs.delete(clusterDirectory, true)) { - log.warn("Filesystem returned false from delete() operation"); - } - - if(!deleteZookeeperNode(clustername)) { - log.warn("Unable to perform node cleanup in Zookeeper."); - } - - if (fs.exists(clusterDirectory)) { - log.warn("Failed to delete {}", clusterDirectory); + if (fs.exists(appDir)) { + if (fs.delete(appDir, true)) { + log.info("Successfully deleted application + " + appName); + return; + } else { + String message = + "Failed to delete application + " + appName + " at: " + appDir; + log.info(message); + throw new YarnException(message); } - - } else { - log.debug("Application Instance {} already destroyed", clustername); - } - - // rm the registry entry —do not let this block the destroy operations - String registryPath = SliderRegistryUtils.registryPathForInstance( - clustername); - try { - getRegistryOperations().delete(registryPath, true); - } catch (IOException e) { - log.warn("Error deleting registry entry {}: {} ", registryPath, e, e); - } catch (SliderException e) { - log.warn("Error binding to registry {} ", e, e); } - - List instances = findAllLiveInstances(clustername); - // detect any race leading to cluster creation during the check/destroy process - // and report a problem. - if (!instances.isEmpty()) { - throw new SliderException(EXIT_APPLICATION_IN_USE, - clustername + ": " - + E_DESTROY_CREATE_RACE_CONDITION - + " :" + - instances.get(0)); + if (!deleteZookeeperNode(appName)) { + String message = + "Failed to cleanup cleanup application " + appName + " in zookeeper"; + log.warn(message); + throw new YarnException(message); } - log.info("Destroyed cluster {}", clustername); - return EXIT_SUCCESS; + //TODO clean registry } + @Override public int actionAmSuicide(String clustername, @@ -715,203 +661,285 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return factory.createClientProvider(); } - /** - * Create the cluster -saving the arguments to a specification file first - * @param clustername cluster name - * @return the status code - * @throws YarnException Yarn problems - * @throws IOException other problems - * @throws BadCommandArgumentsException bad arguments. - */ - public int actionCreate(String clustername, ActionCreateArgs createArgs) throws - YarnException, - IOException { - - actionBuild(clustername, createArgs); - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - clustername, clusterDirectory); - try { - checkForCredentials(getConfig(), instanceDefinition.getAppConf(), - clustername); - } catch (IOException e) { - sliderFileSystem.getFileSystem().delete(clusterDirectory, true); - throw e; - } - return startCluster(clustername, createArgs, createArgs.lifetime); - } - - @Override - public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs) - throws YarnException, IOException { - File template = upgradeArgs.template; - File resources = upgradeArgs.resources; - List containers = upgradeArgs.containers; - List components = upgradeArgs.components; - - // For upgrade spec, let's be little more strict with validation. If either - // --template or --resources is specified, then both needs to be specified. - // Otherwise the internal app config and resources states of the app will be - // unwantedly modified and the change will take effect to the running app - // immediately. - require(!(template != null && resources == null), - "Option %s must be specified with option %s", - Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE); - - require(!(resources != null && template == null), - "Option %s must be specified with option %s", - Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES); - - // For upgrade spec, both --template and --resources should be specified - // and neither of --containers or --components should be used - if (template != null && resources != null) { - require(CollectionUtils.isEmpty(containers), - "Option %s cannot be specified with %s or %s", - Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE, - Arguments.ARG_RESOURCES); - require(CollectionUtils.isEmpty(components), - "Option %s cannot be specified with %s or %s", - Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE, - Arguments.ARG_RESOURCES); - - // not an error to try to upgrade a stopped cluster, just return success - // code, appropriate log messages have already been dumped - if (!isAppInRunningState(clustername)) { - return EXIT_SUCCESS; - } - // Now initiate the upgrade spec flow - buildInstanceDefinition(clustername, upgradeArgs, true, true, true); - SliderClusterOperations clusterOperations = createClusterOperations(clustername); - clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000); - return EXIT_SUCCESS; - } + public ApplicationId actionCreate(Application application) + throws IOException, YarnException { + ServiceApiUtil.validateApplicationPostPayload(application); + String appName = application.getName(); + validateClusterName(appName); + verifyNoLiveApp(appName, "Create"); + Path appDir = checkAppNotExistOnHdfs(application); - // Since neither --template or --resources were specified, it is upgrade - // containers flow. Here any one or both of --containers and --components - // can be specified. If a container is specified with --containers option - // and also belongs to a component type specified with --components, it will - // be upgraded only once. - return actionUpgradeContainers(clustername, upgradeArgs); + ApplicationId appId = submitApp(application); + application.setId(appId.toString()); + // write app definition on to hdfs + persistApp(appDir, application); + return appId; + //TODO deal with registry } - private int actionUpgradeContainers(String clustername, - ActionUpgradeArgs upgradeArgs) throws YarnException, IOException { - verifyBindingsDefined(); - validateClusterName(clustername); - int waittime = upgradeArgs.getWaittime(); // ignored for now - String text = "Upgrade containers"; - log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername, - text, waittime); - - // not an error to try to upgrade a stopped cluster, just return success - // code, appropriate log messages have already been dumped - if (!isAppInRunningState(clustername)) { - return EXIT_SUCCESS; - } - - // Create sets of containers and components to get rid of duplicates and - // for quick lookup during checks below - Set containers = new HashSet<>(); - if (upgradeArgs.containers != null) { - containers.addAll(new ArrayList<>(upgradeArgs.containers)); - } - Set components = new HashSet<>(); - if (upgradeArgs.components != null) { - components.addAll(new ArrayList<>(upgradeArgs.components)); - } - - // check validity of component names and running containers here - List liveContainers = getContainers(clustername); - Set validContainers = new HashSet<>(); - Set validComponents = new HashSet<>(); - for (ContainerInformation liveContainer : liveContainers) { - boolean allContainersAndComponentsAccountedFor = true; - if (CollectionUtils.isNotEmpty(containers)) { - if (containers.contains(liveContainer.containerId)) { - containers.remove(liveContainer.containerId); - validContainers.add(liveContainer.containerId); - } - allContainersAndComponentsAccountedFor = false; - } - if (CollectionUtils.isNotEmpty(components)) { - if (components.contains(liveContainer.component)) { - components.remove(liveContainer.component); - validComponents.add(liveContainer.component); - } - allContainersAndComponentsAccountedFor = false; - } - if (allContainersAndComponentsAccountedFor) { - break; + private ApplicationId submitApp(Application app) + throws IOException, YarnException { + String appName = app.getName(); + Configuration conf = getConfig(); + Path appRootDir = sliderFileSystem.buildClusterDirPath(app.getName()); + deployedClusterName = appName; + + YarnClientApplication yarnApp = yarnClient.createApplication(); + ApplicationSubmissionContext submissionContext = + yarnApp.getApplicationSubmissionContext(); + applicationId = submissionContext.getApplicationId(); + submissionContext.setKeepContainersAcrossApplicationAttempts(true); + if (app.getLifetime() > 0) { + Map appTimeout = new HashMap<>(); + appTimeout.put(ApplicationTimeoutType.LIFETIME, app.getLifetime()); + submissionContext.setApplicationTimeouts(appTimeout); + } + submissionContext.setMaxAppAttempts(conf.getInt(KEY_AM_RESTART_LIMIT, 2)); + + Map localResources = + new HashMap(); + + // copy local slideram-log4j.properties to hdfs and add to localResources + boolean hasSliderAMLog4j = + addAMLog4jResource(appName, conf, localResources); + // copy jars to hdfs and add to localResources + Path tempPath = addJarResource(appName, localResources); + // add keytab if in secure env + addKeytabResourceIfSecure(sliderFileSystem, localResources, conf, appName); + printLocalResources(localResources); + + //TODO SliderAMClientProvider#copyEnvVars + //TODO localResource putEnv + + Map env = addAMEnv(conf, tempPath); + + // create AM CLI + String cmdStr = + buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j); + + //TODO set log aggregation context + //TODO set retry window + submissionContext.setResource(Resource.newInstance( + conf.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM), 1)); + submissionContext.setQueue(conf.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE)); + submissionContext.setApplicationName(appName); + submissionContext.setApplicationType(SliderKeys.APP_TYPE); + Set appTags = + AbstractClientProvider.createApplicationTags(appName, null, null); + if (!appTags.isEmpty()) { + submissionContext.setApplicationTags(appTags); + } + ContainerLaunchContext amLaunchContext = + Records.newRecord(ContainerLaunchContext.class); + amLaunchContext.setCommands(Collections.singletonList(cmdStr)); + amLaunchContext.setEnvironment(env); + amLaunchContext.setLocalResources(localResources); + addCredentialsIfSecure(conf, amLaunchContext); + submissionContext.setAMContainerSpec(amLaunchContext); + yarnClient.submitApplication(submissionContext); + return submissionContext.getApplicationId(); + } + + private void printLocalResources(Map map) { + log.info("Added LocalResource for localization: "); + StringBuilder builder = new StringBuilder(); + for (Map.Entry entry : map.entrySet()) { + builder.append(entry.getKey()).append(" -> ") + .append(entry.getValue().getResource().getFile()) + .append(System.lineSeparator()); + } + log.info(builder.toString()); + } + + private void addCredentialsIfSecure(Configuration conf, + ContainerLaunchContext amLaunchContext) throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + // pick up oozie credentials + Credentials credentials = + CredentialUtils.loadTokensFromEnvironment(System.getenv(), conf); + if (credentials == null) { + // nothing from oozie, so build up directly + credentials = new Credentials( + UserGroupInformation.getCurrentUser().getCredentials()); + CredentialUtils.addRMRenewableFSDelegationTokens(conf, + sliderFileSystem.getFileSystem(), credentials); + } else { + log.info("Using externally supplied credentials to launch AM"); } + amLaunchContext.setTokens(CredentialUtils.marshallCredentials(credentials)); } + } - // If any item remains in containers or components then they are invalid. - // Log warning for them and proceed. - if (CollectionUtils.isNotEmpty(containers)) { - log.warn("Invalid set of containers provided {}", containers); - } - if (CollectionUtils.isNotEmpty(components)) { - log.warn("Invalid set of components provided {}", components); + private String buildCommandLine(String appName, Configuration conf, + Path appRootDir, boolean hasSliderAMLog4j) throws BadConfigException { + JavaCommandLineBuilder CLI = new JavaCommandLineBuilder(); + CLI.forceIPv4().headless(); + //TODO CLI.setJVMHeap + //TODO CLI.addJVMOPTS + if (hasSliderAMLog4j) { + CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME); + CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR); } - - // If not a single valid container or component is specified do not proceed - if (CollectionUtils.isEmpty(validContainers) - && CollectionUtils.isEmpty(validComponents)) { - log.error("Not a single valid container or component specified. Nothing to do."); - return EXIT_NOT_FOUND; + CLI.add(SliderAppMaster.SERVICE_CLASSNAME); + CLI.add(ACTION_CREATE, appName); + //TODO debugAM CLI.add(Arguments.ARG_DEBUG) + CLI.add(Arguments.ARG_CLUSTER_URI, appRootDir.toUri()); +// InetSocketAddress rmSchedulerAddress = getRmSchedulerAddress(conf); +// String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress); +// CLI.add(Arguments.ARG_RM_ADDR, rmAddr); + // pass the registry binding + CLI.addConfOptionToCLI(conf, RegistryConstants.KEY_REGISTRY_ZK_ROOT, + RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); + CLI.addMandatoryConfOption(conf, RegistryConstants.KEY_REGISTRY_ZK_QUORUM); + if(isHadoopClusterSecure(conf)) { + //TODO Is this required ?? + // if the cluster is secure, make sure that + // the relevant security settings go over + CLI.addConfOption(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); } +// // copy over any/all YARN RM client values, in case the server-side XML conf file +// // has the 0.0.0.0 address +// CLI.addConfOptions(conf, YarnConfiguration.RM_ADDRESS, +// YarnConfiguration.RM_CLUSTER_ID, YarnConfiguration.RM_HOSTNAME, +// YarnConfiguration.RM_PRINCIPAL); - SliderClusterProtocol appMaster = connect(findInstance(clustername)); - Messages.UpgradeContainersRequestProto r = - Messages.UpgradeContainersRequestProto - .newBuilder() - .setMessage(text) - .addAllContainer(validContainers) - .addAllComponent(validComponents) - .build(); - appMaster.upgradeContainers(r); - log.info("Cluster upgrade issued for -"); - if (CollectionUtils.isNotEmpty(validContainers)) { - log.info(" Containers (total {}): {}", validContainers.size(), - validContainers); - } - if (CollectionUtils.isNotEmpty(validComponents)) { - log.info(" Components (total {}): {}", validComponents.size(), - validComponents); - } + // write out the path output + CLI.addOutAndErrFiles(STDOUT_AM, STDERR_AM); + String cmdStr = CLI.build(); + log.info("Completed setting up app master command: {}", cmdStr); + return cmdStr; + } - return EXIT_SUCCESS; + private Map addAMEnv(Configuration conf, Path tempPath) + throws IOException { + Map env = new HashMap(); + ClasspathConstructor classpath = + buildClasspath(SliderKeys.SUBMITTED_CONF_DIR, "lib", + sliderFileSystem, getUsingMiniMRCluster()); + env.put("CLASSPATH", classpath.buildClasspath()); + env.put("LANG", "en_US.UTF-8"); + env.put("LC_ALL", "en_US.UTF-8"); + env.put("LANGUAGE", "en_US.UTF-8"); + String jaas = System.getenv(HADOOP_JAAS_DEBUG); + if (jaas != null) { + env.put(HADOOP_JAAS_DEBUG, jaas); + } + env.putAll(getAmLaunchEnv(conf)); + log.info("AM env: \n{}", stringifyMap(env)); + return env; + } + + private Path addJarResource(String appName, + Map localResources) + throws IOException, SliderException { + Path libPath = sliderFileSystem.buildClusterDirPath(appName); + ProviderUtils + .addProviderJar(localResources, SliderAppMaster.class, SLIDER_JAR, + sliderFileSystem, libPath, "lib", false); + Path dependencyLibTarGzip = sliderFileSystem.getDependencyTarGzip(); + if (sliderFileSystem.isFile(dependencyLibTarGzip)) { + log.info("Loading lib tar from " + sliderFileSystem.getFileSystem() + .getScheme() + ": " + dependencyLibTarGzip); + SliderUtils.putAmTarGzipAndUpdate(localResources, sliderFileSystem); + } else { + String[] libs = SliderUtils.getLibDirs(); + log.info("Loading dependencies from local file system: " + Arrays + .toString(libs)); + for (String libDirProp : libs) { + ProviderUtils + .addAllDependencyJars(localResources, sliderFileSystem, libPath, + "lib", libDirProp); + } + } + return libPath; + } + + private boolean addAMLog4jResource(String appName, Configuration conf, + Map localResources) + throws IOException, BadClusterStateException { + boolean hasSliderAMLog4j = false; + String hadoopConfDir = + System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); + if (hadoopConfDir != null) { + File localFile = + new File(hadoopConfDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME); + if (localFile.exists()) { + Path localFilePath = createLocalPath(localFile); + Path appDirPath = sliderFileSystem.buildClusterDirPath(appName); + Path remoteConfPath = + new Path(appDirPath, SliderKeys.SUBMITTED_CONF_DIR); + Path remoteFilePath = + new Path(remoteConfPath, SliderKeys.LOG4J_SERVER_PROP_FILENAME); + copy(conf, localFilePath, remoteFilePath); + LocalResource localResource = sliderFileSystem + .createAmResource(remoteConfPath, LocalResourceType.FILE); + localResources.put(localFilePath.getName(), localResource); + hasSliderAMLog4j = true; + } + } + return hasSliderAMLog4j; + } + + private Path checkAppNotExistOnHdfs(Application application) + throws IOException, SliderException { + Path appDir = sliderFileSystem.buildClusterDirPath(application.getName()); + sliderFileSystem.verifyDirectoryNonexistent( + new Path(appDir, application.getName() + ".json")); + return appDir; } - // returns true if and only if app is in RUNNING state - private boolean isAppInRunningState(String clustername) throws YarnException, - IOException { - // is this actually a known cluster? - sliderFileSystem.locateInstanceDefinition(clustername); - ApplicationReport app = findInstance(clustername); - if (app == null) { - // exit early - log.info("Cluster {} not running", clustername); - return false; - } - log.debug("App to upgrade was found: {}:\n{}", clustername, - new OnDemandReportStringifier(app)); - if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) { - log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.", - clustername, app.getYarnApplicationState(), ACTION_UPDATE); - return false; + private void persistApp(Path appDir, Application application) + throws IOException, SliderException { + FsPermission appDirPermission = new FsPermission("777"); + sliderFileSystem.createWithPermissions(appDir, appDirPermission); + Path appJson = new Path(appDir, application.getName() + ".json"); + jsonSerDeser + .save(sliderFileSystem.getFileSystem(), appJson, application, true); + log.info( + "Persisted application " + application.getName() + " at " + appJson); + } + + private void addKeytabResourceIfSecure(SliderFileSystem fileSystem, + Map localResource, Configuration conf, + String appName) throws IOException, BadConfigException { + if (!UserGroupInformation.isSecurityEnabled()) { + return; } - - // IPC request to upgrade containers is possible if the app is running. - if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING - .ordinal()) { - log.info("Cluster {} is in a pre-running state {}. To upgrade it needs " - + "to be RUNNING.", clustername, app.getYarnApplicationState()); - return false; + String keytabPreInstalledOnHost = + conf.get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); + if (StringUtils.isEmpty(keytabPreInstalledOnHost)) { + String amKeytabName = + conf.get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + String keytabDir = conf.get(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR); + Path keytabPath = + fileSystem.buildKeytabPath(keytabDir, amKeytabName, appName); + if (fileSystem.getFileSystem().exists(keytabPath)) { + LocalResource keytabRes = + fileSystem.createAmResource(keytabPath, LocalResourceType.FILE); + localResource + .put(SliderKeys.KEYTAB_DIR + "/" + amKeytabName, keytabRes); + log.info("Adding AM keytab on hdfs: " + keytabPath); + } else { + log.warn("No keytab file was found at {}.", keytabPath); + if (conf.getBoolean(KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) { + throw new BadConfigException("No keytab file was found at %s.", + keytabPath); + } else { + log.warn("The AM will be " + + "started without a kerberos authenticated identity. " + + "The application is therefore not guaranteed to remain " + + "operational beyond 24 hours."); + } + } } + } - return true; + @Override + public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs) + throws YarnException, IOException { + //TODO + return 0; } protected static void checkForCredentials(Configuration conf, @@ -952,15 +980,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } } - private static char[] readOnePassword(String alias) throws IOException { - Console console = System.console(); - if (console == null) { - throw new IOException("Unable to input password for " + alias + - " because System.console() is null"); - } - return readPassword(alias, console); - } - private static char[] readPassword(String alias, Console console) throws IOException { char[] cred = null; @@ -987,16 +1006,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } @Override - public int actionBuild(String clustername, - AbstractClusterBuildingActionArgs buildInfo) throws - YarnException, - IOException { - - buildInstanceDefinition(clustername, buildInfo, false, false); - return EXIT_SUCCESS; - } - - @Override public int actionKeytab(ActionKeytabArgs keytabInfo) throws YarnException, IOException { if (keytabInfo.install) { @@ -1527,12 +1536,12 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe if (buildInfo.lifetime > 0) { updateLifetime(clustername, buildInfo.lifetime); } else { - buildInstanceDefinition(clustername, buildInfo, true, true); + //TODO upgrade } return EXIT_SUCCESS; } - public void updateLifetime(String appName, long lifetime) + public String updateLifetime(String appName, long lifetime) throws YarnException, IOException { EnumSet appStates = EnumSet.range( YarnApplicationState.NEW, YarnApplicationState.RUNNING); @@ -1553,396 +1562,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe log.info("Successfully updated lifetime for an application: appName = " + appName + ", appId = " + appId + ". New expiry time in ISO8601 format is " + newTimeout); - } - - /** - * Build up the AggregateConfiguration for an application instance then - * persists it - * @param clustername name of the cluster - * @param buildInfo the arguments needed to build the cluster - * @param overwrite true if existing cluster directory can be overwritten - * @param liveClusterAllowed true if live cluster can be modified - * @throws YarnException - * @throws IOException - */ - - public void buildInstanceDefinition(String clustername, - AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, - boolean liveClusterAllowed) throws YarnException, IOException { - buildInstanceDefinition(clustername, buildInfo, overwrite, - liveClusterAllowed, false); - } - - public void buildInstanceDefinition(String clustername, - AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, - boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException, - IOException { - // verify that a live cluster isn't there - validateClusterName(clustername); - verifyBindingsDefined(); - if (!liveClusterAllowed) { - verifyNoLiveClusters(clustername, "Create"); - } - - Configuration conf = getConfig(); - String registryQuorum = lookupZKQuorum(); - - Path appconfdir = buildInfo.getConfdir(); - // Provider - String providerName = buildInfo.getProvider(); - requireArgumentSet(Arguments.ARG_PROVIDER, providerName); - log.debug("Provider is {}", providerName); - SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf); - AbstractClientProvider provider = - createClientProvider(providerName); - InstanceBuilder builder = - new InstanceBuilder(sliderFileSystem, - getConfig(), - clustername); - - AggregateConf instanceDefinition = new AggregateConf(); - ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); - ConfTreeOperations resources = instanceDefinition.getResourceOperations(); - ConfTreeOperations internal = instanceDefinition.getInternalOperations(); - //initial definition is set by the providers - sliderAM.prepareInstanceConfiguration(instanceDefinition); - provider.prepareInstanceConfiguration(instanceDefinition); - - //load in any specified on the command line - if (buildInfo.resources != null) { - try { - resources.mergeFile(buildInfo.resources, - new ResourcesInputPropertiesValidator()); - - } catch (IOException e) { - throw new BadConfigException(e, - "incorrect argument to %s: \"%s\" : %s ", - Arguments.ARG_RESOURCES, - buildInfo.resources, - e.toString()); - } - } - if (buildInfo.template != null) { - try { - appConf.mergeFile(buildInfo.template, - new TemplateInputPropertiesValidator()); - } catch (IOException e) { - throw new BadConfigException(e, - "incorrect argument to %s: \"%s\" : %s ", - Arguments.ARG_TEMPLATE, - buildInfo.template, - e.toString()); - } - } - - if (isUpgradeFlow) { - ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo; - if (!upgradeInfo.force) { - validateClientAndClusterResource(clustername, resources); - } - } - - //get the command line options - ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree(); - ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree(); - - appConf.merge(cmdLineAppOptions); - - AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem); - appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf); - - // put the role counts into the resources file - Map argsRoleMap = buildInfo.getComponentMap(); - for (Map.Entry roleEntry : argsRoleMap.entrySet()) { - String count = roleEntry.getValue(); - String key = roleEntry.getKey(); - log.info("{} => {}", key, count); - resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count); - } - - //all CLI role options - Map> appOptionMap = - buildInfo.getCompOptionMap(); - appConf.mergeComponents(appOptionMap); - - //internal picks up core. values only - internal.propagateGlobalKeys(appConf, "slider."); - internal.propagateGlobalKeys(appConf, "internal."); - - //copy over role. and yarn. values ONLY to the resources - if (PROPAGATE_RESOURCE_OPTION) { - resources.propagateGlobalKeys(appConf, "component."); - resources.propagateGlobalKeys(appConf, "role."); - resources.propagateGlobalKeys(appConf, "yarn."); - resources.mergeComponentsPrefix(appOptionMap, "component.", true); - resources.mergeComponentsPrefix(appOptionMap, "yarn.", true); - resources.mergeComponentsPrefix(appOptionMap, "role.", true); - } - - // resource component args - appConf.merge(cmdLineResourceOptions); - resources.merge(cmdLineResourceOptions); - resources.mergeComponents(buildInfo.getResourceCompOptionMap()); - - builder.init(providerName, instanceDefinition); - builder.resolve(); - builder.propagateFilename(); - builder.propagatePrincipals(); - builder.setImageDetailsIfAvailable(buildInfo.getImage(), - buildInfo.getAppHomeDir()); - builder.setQueue(buildInfo.queue); - - String quorum = buildInfo.getZKhosts(); - if (isUnset(quorum)) { - quorum = registryQuorum; - } - if (isUnset(quorum)) { - throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM); - } - ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(), - getUsername(), - clustername, - registryQuorum, - quorum); - String zookeeperRoot = buildInfo.getAppZKPath(); - - if (isSet(zookeeperRoot)) { - zkPaths.setAppPath(zookeeperRoot); - } else { - String createDefaultZkNode = appConf.getGlobalOptions() - .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false"); - if (createDefaultZkNode.equals("true")) { - String defaultZKPath = createZookeeperNode(clustername, false); - log.debug("ZK node created for application instance: {}", defaultZKPath); - if (defaultZKPath != null) { - zkPaths.setAppPath(defaultZKPath); - } - } else { - // create AppPath if default is being used - String defaultZKPath = createZookeeperNode(clustername, true); - log.debug("ZK node assigned to application instance: {}", defaultZKPath); - zkPaths.setAppPath(defaultZKPath); - } - } - - builder.addZKBinding(zkPaths); - - //then propagate any package URI - if (buildInfo.packageURI != null) { - appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI); - } - - propagatePythonExecutable(conf, instanceDefinition); - - // make any substitutions needed at this stage - replaceTokens(appConf.getConfTree(), getUsername(), clustername); - - // TODO: Refactor the validation code and persistence code - try { - persistInstanceDefinition(overwrite, appconfdir, builder); - appDefinitionPersister.persistPackages(); - - } catch (LockAcquireFailedException e) { - log.warn("Failed to get a Lock on {} : {}", builder, e, e); - throw new BadClusterStateException("Failed to save " + clustername - + ": " + e); - } - - // providers to validate what there is - // TODO: Validation should be done before persistence - AggregateConf instanceDescription = builder.getInstanceDescription(); - validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem); - validateInstanceDefinition(provider, instanceDescription, sliderFileSystem); - } - - private void validateClientAndClusterResource(String clustername, - ConfTreeOperations clientResources) throws BadClusterStateException, - SliderException, IOException { - log.info("Validating upgrade resource definition with current cluster " - + "state (components and instance count)"); - Map clientComponentInstances = new HashMap<>(); - for (String componentName : clientResources.getComponentNames()) { - if (!SliderKeys.COMPONENT_AM.equals(componentName)) { - clientComponentInstances.put(componentName, clientResources - .getComponentOptInt(componentName, - COMPONENT_INSTANCES, -1)); - } - } - - AggregateConf clusterConf = null; - try { - clusterConf = loadPersistedClusterDescription(clustername); - } catch (LockAcquireFailedException e) { - log.warn("Failed to get a Lock on cluster resource : {}", e, e); - throw new BadClusterStateException( - "Failed to load client resource definition " + clustername + ": " + e, e); - } - Map clusterComponentInstances = new HashMap<>(); - for (Map.Entry> component : clusterConf - .getResources().components.entrySet()) { - if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) { - clusterComponentInstances.put( - component.getKey(), - Integer.decode(component.getValue().get( - COMPONENT_INSTANCES))); - } - } - - // client and cluster should be an exact match - Iterator> clientComponentInstanceIt = clientComponentInstances - .entrySet().iterator(); - while (clientComponentInstanceIt.hasNext()) { - Map.Entry clientComponentInstanceEntry = clientComponentInstanceIt.next(); - if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) { - // compare instance count now and remove from both maps if they match - if (clusterComponentInstances - .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry - .getValue().intValue()) { - clusterComponentInstances.remove(clientComponentInstanceEntry - .getKey()); - clientComponentInstanceIt.remove(); - } - } - } - - if (!clientComponentInstances.isEmpty() - || !clusterComponentInstances.isEmpty()) { - log.error("Mismatch found in upgrade resource definition and cluster " - + "resource state"); - if (!clientComponentInstances.isEmpty()) { - log.info("The upgrade resource definitions that do not match are:"); - for (Map.Entry clientComponentInstanceEntry : clientComponentInstances - .entrySet()) { - log.info(" Component Name: {}, Instance count: {}", - clientComponentInstanceEntry.getKey(), - clientComponentInstanceEntry.getValue()); - } - } - if (!clusterComponentInstances.isEmpty()) { - log.info("The cluster resources that do not match are:"); - for (Map.Entry clusterComponentInstanceEntry : clusterComponentInstances - .entrySet()) { - log.info(" Component Name: {}, Instance count: {}", - clusterComponentInstanceEntry.getKey(), - clusterComponentInstanceEntry.getValue()); - } - } - throw new BadConfigException("Resource definition provided for " - + "upgrade does not match with that of the currently running " - + "cluster.\nIf you are aware of what you are doing, rerun the " - + "command with " + Arguments.ARG_FORCE + " option."); - } - } - - protected void persistInstanceDefinition(boolean overwrite, - Path appconfdir, - InstanceBuilder builder) - throws IOException, SliderException, LockAcquireFailedException { - builder.persist(appconfdir, overwrite); - } - - @VisibleForTesting - public static void replaceTokens(ConfTree conf, - String userName, String clusterName) throws IOException { - Map newglobal = new HashMap<>(); - for (Entry entry : conf.global.entrySet()) { - newglobal.put(entry.getKey(), replaceTokens(entry.getValue(), - userName, clusterName)); - } - conf.global.putAll(newglobal); - - for (String component : conf.components.keySet()) { - Map newComponent = new HashMap<>(); - for (Entry entry : conf.components.get(component).entrySet()) { - newComponent.put(entry.getKey(), replaceTokens(entry.getValue(), - userName, clusterName)); - } - conf.components.get(component).putAll(newComponent); - } - - Map> newcred = new HashMap<>(); - for (Entry> entry : conf.credentials.entrySet()) { - List resultList = new ArrayList<>(); - for (String v : entry.getValue()) { - resultList.add(replaceTokens(v, userName, clusterName)); - } - newcred.put(replaceTokens(entry.getKey(), userName, clusterName), - resultList); - } - conf.credentials.clear(); - conf.credentials.putAll(newcred); - } - - private static String replaceTokens(String s, String userName, - String clusterName) throws IOException { - return s.replaceAll(Pattern.quote("${USER}"), userName) - .replaceAll(Pattern.quote("${USER_NAME}"), userName); - } - - public FsPermission getClusterDirectoryPermissions(Configuration conf) { - String clusterDirPermsOct = - conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); - return new FsPermission(clusterDirPermsOct); - } - - /** - * Verify that the Resource Manager is configured (on a non-HA cluster). - * with a useful error message - * @throws BadCommandArgumentsException the exception raised on an invalid config - */ - public void verifyBindingsDefined() throws BadCommandArgumentsException { - InetSocketAddress rmAddr = getRmAddress(getConfig()); - if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false) - && !isAddressDefined(rmAddr)) { - throw new BadCommandArgumentsException( - E_NO_RESOURCE_MANAGER - + " in the argument " - + Arguments.ARG_MANAGER - + " or the configuration property " - + YarnConfiguration.RM_ADDRESS - + " value :" + rmAddr); - } - } - - /** - * Load and start a cluster specification. - * This assumes that all validation of args and cluster state - * have already taken place - * - * @param clustername name of the cluster. - * @param launchArgs launch arguments - * @param lifetime - * @return the exit code - * @throws YarnException - * @throws IOException - */ - protected int startCluster(String clustername, LaunchArgsAccessor launchArgs, - long lifetime) throws YarnException, IOException { - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - clustername, - clusterDirectory); - - LaunchedApplication launchedApplication = - launchApplication(clustername, clusterDirectory, instanceDefinition, - serviceArgs.isDebug(), lifetime); - - if (launchArgs.getOutputFile() != null) { - // output file has been requested. Get the app report and serialize it - ApplicationReport report = - launchedApplication.getApplicationReport(); - SerializedApplicationReport sar = new SerializedApplicationReport(report); - sar.submitTime = System.currentTimeMillis(); - ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); - serDeser.save(sar, launchArgs.getOutputFile()); - } - int waittime = launchArgs.getWaittime(); - if (waittime > 0) { - return waitForAppRunning(launchedApplication, waittime, waittime); - } else { - // no waiting - return EXIT_SUCCESS; - } + return newTimeout; } /** @@ -1968,415 +1588,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } } - /** - * Load the instance definition. - * @param name cluster name - * @param resolved flag to indicate the cluster should be resolved - * @return the loaded configuration - * @throws IOException IO problems - * @throws SliderException slider explicit issues - * @throws UnknownApplicationInstanceException if the file is not found - */ - public AggregateConf loadInstanceDefinition(String name, - boolean resolved) throws - IOException, - SliderException { - - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - name, - clusterDirectory); - if (resolved) { - instanceDefinition.resolve(); - } - return instanceDefinition; - - } - - protected AppMasterLauncher setupAppMasterLauncher(String clustername, - Path clusterDirectory, AggregateConf instanceDefinition, boolean debugAM, - long lifetime) - throws YarnException, IOException{ - deployedClusterName = clustername; - validateClusterName(clustername); - verifyNoLiveClusters(clustername, "Launch"); - Configuration config = getConfig(); - lookupZKQuorum(); - boolean clusterSecure = isHadoopClusterSecure(config); - //create the Slider AM provider -this helps set up the AM - SliderAMClientProvider sliderAM = new SliderAMClientProvider(config); - - instanceDefinition.resolve(); - launchedInstanceDefinition = instanceDefinition; - - ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations(); - MapOperations internalOptions = internalOperations.getGlobalOptions(); - ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations(); - ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations(); - Path generatedConfDirPath = - createPathThatMustExist(internalOptions.getMandatoryOption( - INTERNAL_GENERATED_CONF_PATH)); - Path snapshotConfPath = - createPathThatMustExist(internalOptions.getMandatoryOption( - INTERNAL_SNAPSHOT_CONF_PATH)); - - - // cluster Provider - AbstractClientProvider provider = createClientProvider( - internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME)); - if (log.isDebugEnabled()) { - log.debug(instanceDefinition.toString()); - } - MapOperations sliderAMResourceComponent = - resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM); - MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions(); - - // add the tags if available - Set applicationTags = provider.getApplicationTags(sliderFileSystem, - appOperations, clustername); - - Credentials credentials = null; - if (clusterSecure) { - // pick up oozie credentials - credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(), - config); - if (credentials == null) { - // nothing from oozie, so build up directly - credentials = new Credentials( - UserGroupInformation.getCurrentUser().getCredentials()); - CredentialUtils.addRMRenewableFSDelegationTokens(config, - sliderFileSystem.getFileSystem(), - credentials); - CredentialUtils.addRMDelegationToken(yarnClient, credentials); - - } else { - log.info("Using externally supplied credentials to launch AM"); - } - } - - AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, - SliderKeys.APP_TYPE, - config, - sliderFileSystem, - yarnClient, - clusterSecure, - sliderAMResourceComponent, - resourceGlobalOptions, - applicationTags, - credentials); - - ApplicationId appId = amLauncher.getApplicationId(); - // set the application name; - amLauncher.setKeepContainersOverRestarts(true); - // set lifetime in submission context; - Map appTimeout = new HashMap<>(); - if (lifetime > 0) { - appTimeout.put(ApplicationTimeoutType.LIFETIME, lifetime); - } - amLauncher.submissionContext.setApplicationTimeouts(appTimeout); - int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0); - amLauncher.setMaxAppAttempts(maxAppAttempts); - - sliderFileSystem.purgeAppInstanceTempFiles(clustername); - Path tempPath = sliderFileSystem.createAppInstanceTempPath( - clustername, - appId.toString() + "/am"); - String libdir = "lib"; - Path libPath = new Path(tempPath, libdir); - sliderFileSystem.getFileSystem().mkdirs(libPath); - log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath); - - // set local resources for the application master - // local files or archives as needed - // In this scenario, the jar file for the application master is part of the local resources - Map localResources = amLauncher.getLocalResources(); - - // look for the configuration directory named on the command line - boolean hasServerLog4jProperties = false; - Path remoteConfPath = null; - String relativeConfDir = null; - String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR); - if (isUnset(confdirProp)) { - log.debug("No local configuration directory provided as system property"); - } else { - File confDir = new File(confdirProp); - if (!confDir.exists()) { - throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND, - confDir); - } - Path localConfDirPath = createLocalPath(confDir); - remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR); - log.debug("Slider configuration directory is {}; remote to be {}", - localConfDirPath, remoteConfPath); - copyDirectory(config, localConfDirPath, remoteConfPath, null); - - File log4jserver = - new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME); - hasServerLog4jProperties = log4jserver.isFile(); - } - if (!hasServerLog4jProperties) { - // check for log4j properties in hadoop conf dir - String hadoopConfDir = System.getenv(ApplicationConstants.Environment - .HADOOP_CONF_DIR.name()); - if (hadoopConfDir != null) { - File localFile = new File(hadoopConfDir, SliderKeys - .LOG4J_SERVER_PROP_FILENAME); - if (localFile.exists()) { - Path localFilePath = createLocalPath(localFile); - remoteConfPath = new Path(clusterDirectory, - SliderKeys.SUBMITTED_CONF_DIR); - Path remoteFilePath = new Path(remoteConfPath, SliderKeys - .LOG4J_SERVER_PROP_FILENAME); - copy(config, localFilePath, remoteFilePath); - hasServerLog4jProperties = true; - } - } - } - // the assumption here is that minimr cluster => this is a test run - // and the classpath can look after itself - - boolean usingMiniMRCluster = getUsingMiniMRCluster(); - if (!usingMiniMRCluster) { - - log.debug("Destination is not a MiniYARNCluster -copying full classpath"); - - // insert conf dir first - if (remoteConfPath != null) { - relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR; - Map submittedConfDir = - sliderFileSystem.submitDirectory(remoteConfPath, - relativeConfDir); - mergeMaps(localResources, submittedConfDir); - } - } - // build up the configuration - // IMPORTANT: it is only after this call that site configurations - // will be valid. - - propagatePrincipals(config, instanceDefinition); - // validate security data - -/* - // turned off until tested - SecurityConfiguration securityConfiguration = - new SecurityConfiguration(config, - instanceDefinition, clustername); - -*/ - Configuration clientConfExtras = new Configuration(false); - // then build up the generated path. - FsPermission clusterPerms = getClusterDirectoryPermissions(config); - copyDirectory(config, snapshotConfPath, generatedConfDirPath, - clusterPerms); - - - // standard AM resources - sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem, - config, - amLauncher, - instanceDefinition, - snapshotConfPath, - generatedConfDirPath, - clientConfExtras, - libdir, - tempPath, - usingMiniMRCluster); - //add provider-specific resources - provider.prepareAMAndConfigForLaunch(sliderFileSystem, - config, - amLauncher, - instanceDefinition, - snapshotConfPath, - generatedConfDirPath, - clientConfExtras, - libdir, - tempPath, - usingMiniMRCluster); - - // now that the site config is fully generated, the provider gets - // to do a quick review of them. - log.debug("Preflight validation of cluster configuration"); - - - sliderAM.preflightValidateClusterConfiguration(sliderFileSystem, - clustername, - config, - instanceDefinition, - clusterDirectory, - generatedConfDirPath, - clusterSecure - ); - - provider.preflightValidateClusterConfiguration(sliderFileSystem, - clustername, - config, - instanceDefinition, - clusterDirectory, - generatedConfDirPath, - clusterSecure - ); - - - if (!(provider instanceof DockerClientProvider)) { - Path imagePath = - extractImagePath(sliderFileSystem, internalOptions); - if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) { - log.debug("Registered image path {}", imagePath); - } - } - - // build the environment - amLauncher.putEnv( - buildEnvMap(sliderAMResourceComponent)); - ClasspathConstructor classpath = buildClasspath(relativeConfDir, - libdir, - getConfig(), - sliderFileSystem, - usingMiniMRCluster); - amLauncher.setClasspath(classpath); - //add english env - amLauncher.setEnv("LANG", "en_US.UTF-8"); - amLauncher.setEnv("LC_ALL", "en_US.UTF-8"); - amLauncher.setEnv("LANGUAGE", "en_US.UTF-8"); - amLauncher.maybeSetEnv(HADOOP_JAAS_DEBUG, - System.getenv(HADOOP_JAAS_DEBUG)); - amLauncher.putEnv(getAmLaunchEnv(config)); - - for (Map.Entry envs : getSystemEnv().entrySet()) { - log.debug("System env {}={}", envs.getKey(), envs.getValue()); - } - if (log.isDebugEnabled()) { - log.debug("AM classpath={}", classpath); - log.debug("Environment Map:\n{}", - stringifyMap(amLauncher.getEnv())); - log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath)); - } - - // rm address - - InetSocketAddress rmSchedulerAddress; - try { - rmSchedulerAddress = getRmSchedulerAddress(config); - } catch (IllegalArgumentException e) { - throw new BadConfigException("%s Address invalid: %s", - YarnConfiguration.RM_SCHEDULER_ADDRESS, - config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS)); - } - String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress); - - JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder(); - // insert any JVM options); - sliderAM.addJVMOptions(instanceDefinition, commandLine); - // enable asserts - commandLine.enableJavaAssertions(); - - // if the conf dir has a slideram-log4j.properties, switch to that - if (hasServerLog4jProperties) { - commandLine.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME); - commandLine.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR); - } - - // add the AM sevice entry point - commandLine.add(SliderAppMaster.SERVICE_CLASSNAME); - - // create action and the cluster name - commandLine.add(ACTION_CREATE, clustername); - - // debug - if (debugAM) { - commandLine.add(Arguments.ARG_DEBUG); - } - - // set the cluster directory path - commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri()); - - if (!isUnset(rmAddr)) { - commandLine.add(Arguments.ARG_RM_ADDR, rmAddr); - } - - if (serviceArgs.getFilesystemBinding() != null) { - commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding()); - } - - // pass the registry binding - commandLine.addConfOptionToCLI(config, RegistryConstants.KEY_REGISTRY_ZK_ROOT, - RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); - commandLine.addMandatoryConfOption(config, RegistryConstants.KEY_REGISTRY_ZK_QUORUM); - - if (clusterSecure) { - // if the cluster is secure, make sure that - // the relevant security settings go over - commandLine.addConfOption(config, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); - } - - // copy over any/all YARN RM client values, in case the server-side XML conf file - // has the 0.0.0.0 address - commandLine.addConfOptions(config, - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.RM_CLUSTER_ID, - YarnConfiguration.RM_HOSTNAME, - YarnConfiguration.RM_PRINCIPAL); - - // write out the path output - commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM); - - String cmdStr = commandLine.build(); - log.debug("Completed setting up app master command {}", cmdStr); - - amLauncher.addCommandLine(commandLine); - - // the Slider AM gets to configure the AM requirements, not the custom provider - sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent, - amLauncher.getResource()); - - - // Set the priority for the application master - amLauncher.setPriority(config.getInt(KEY_YARN_QUEUE_PRIORITY, - DEFAULT_YARN_QUEUE_PRIORITY)); - - // Set the queue to which this application is to be submitted in the RM - // Queue for App master - String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE); - String suppliedQueue = internalOperations.getGlobalOptions().get(INTERNAL_QUEUE); - if(!isUnset(suppliedQueue)) { - amQueue = suppliedQueue; - log.info("Using queue {} for the application instance.", amQueue); - } - - if (isSet(amQueue)) { - amLauncher.setQueue(amQueue); - } - return amLauncher; - } - - /** - * - * @param clustername name of the cluster - * @param clusterDirectory cluster dir - * @param instanceDefinition the instance definition - * @param debugAM enable debug AM options - * @param lifetime - * @return the launched application - * @throws YarnException - * @throws IOException - */ - public LaunchedApplication launchApplication(String clustername, Path clusterDirectory, - AggregateConf instanceDefinition, boolean debugAM, long lifetime) - throws YarnException, IOException { - - AppMasterLauncher amLauncher = setupAppMasterLauncher(clustername, - clusterDirectory, - instanceDefinition, - debugAM, lifetime); - - applicationId = amLauncher.getApplicationId(); - log.info("Submitting application {}", applicationId); - - // submit the application - LaunchedApplication launchedApplication = amLauncher.submitApplication(); - return launchedApplication; - } - protected Map getAmLaunchEnv(Configuration config) { String sliderAmLaunchEnv = config.get(KEY_AM_LAUNCH_ENV); log.debug("{} = {}", KEY_AM_LAUNCH_ENV, sliderAmLaunchEnv); @@ -2431,95 +1642,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return placeholderKeyValueMap; } - private void propagatePythonExecutable(Configuration config, - AggregateConf instanceDefinition) { - String pythonExec = config.get( - PYTHON_EXECUTABLE_PATH); - if (pythonExec != null) { - instanceDefinition.getAppConfOperations().getGlobalOptions().putIfUnset( - PYTHON_EXECUTABLE_PATH, - pythonExec); - } - } - - - /** - * Wait for the launched app to be accepted in the time - * and, optionally running. - *

- * If the application - * - * @param launchedApplication application - * @param acceptWaitMillis time in millis to wait for accept - * @param runWaitMillis time in millis to wait for the app to be running. - * May be null, in which case no wait takes place - * @return exit code: success - * @throws YarnException - * @throws IOException - */ - public int waitForAppRunning(LaunchedApplication launchedApplication, - int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException { - assert launchedApplication != null; - int exitCode; - // wait for the submit state to be reached - ApplicationReport report = launchedApplication.monitorAppToState( - YarnApplicationState.ACCEPTED, - new Duration(acceptWaitMillis)); - - // may have failed, so check that - if (hasAppFinished(report)) { - exitCode = buildExitCode(report); - } else { - // exit unless there is a wait - - - if (runWaitMillis != 0) { - // waiting for state to change - Duration duration = new Duration(runWaitMillis * 1000); - duration.start(); - report = launchedApplication.monitorAppToState( - YarnApplicationState.RUNNING, duration); - if (report != null && - report.getYarnApplicationState() == YarnApplicationState.RUNNING) { - exitCode = EXIT_SUCCESS; - } else { - exitCode = buildExitCode(report); - } - } else { - exitCode = EXIT_SUCCESS; - } - } - return exitCode; - } - - - /** - * Propagate any critical principals from the current site config down to the HBase one. - * @param config config to read from - * @param clusterSpec cluster spec - */ - private void propagatePrincipals(Configuration config, - AggregateConf clusterSpec) { - String dfsPrincipal = config.get(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); - if (dfsPrincipal != null) { - String siteDfsPrincipal = SITE_XML_PREFIX + DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; - clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset( - siteDfsPrincipal, - dfsPrincipal); - } - } - - /** - * Create a path that must exist in the cluster fs - * @param uri uri to create - * @return the path - * @throws FileNotFoundException if the path does not exist - */ - public Path createPathThatMustExist(String uri) throws - SliderException, IOException { - return sliderFileSystem.createPathThatMustExist(uri); - } - /** * verify that a live cluster isn't there * @param clustername cluster name @@ -2527,7 +1649,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe * @throws SliderException with exit code EXIT_CLUSTER_LIVE * if a cluster of that name is either live or starting up. */ - public void verifyNoLiveClusters(String clustername, String action) throws + public void verifyNoLiveApp(String clustername, String action) throws IOException, YarnException { List existing = findAllLiveInstances(clustername); @@ -2554,11 +1676,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return deployedClusterName; } - @VisibleForTesting - public void setDeployedClusterName(String deployedClusterName) { - this.deployedClusterName = deployedClusterName; - } - /** * ask if the client is using a mini MR cluster * @return true if they are @@ -2568,109 +1685,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe false); } - /** - * Get the application name used in the zookeeper root paths - * @return an application-specific path in ZK - */ - private String getAppName() { - return "slider"; - } - - /** - * Wait for the app to start running (or go past that state) - * @param duration time to wait - * @return the app report; null if the duration turned out - * @throws YarnException YARN or app issues - * @throws IOException IO problems - */ - @VisibleForTesting - public ApplicationReport monitorAppToRunning(Duration duration) - throws YarnException, IOException { - return monitorAppToState(YarnApplicationState.RUNNING, duration); - } - - /** - * Build an exit code for an application from its report. - * If the report parameter is null, its interpreted as a timeout - * @param report report application report - * @return the exit code - * @throws IOException - * @throws YarnException - */ - private int buildExitCode(ApplicationReport report) throws - IOException, - YarnException { - if (null == report) { - return EXIT_TIMED_OUT; - } - - YarnApplicationState state = report.getYarnApplicationState(); - FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); - switch (state) { - case FINISHED: - if (FinalApplicationStatus.SUCCEEDED == dsStatus) { - log.info("Application has completed successfully"); - return EXIT_SUCCESS; - } else { - log.info("Application finished unsuccessfully." + - "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop", - state, dsStatus); - return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR; - } - - case KILLED: - log.info("Application did not finish. YarnState={}, DSFinalStatus={}", - state, dsStatus); - return EXIT_YARN_SERVICE_KILLED; - - case FAILED: - log.info("Application Failed. YarnState={}, DSFinalStatus={}", state, - dsStatus); - return EXIT_YARN_SERVICE_FAILED; - - default: - //not in any of these states - return EXIT_SUCCESS; - } - } - - /** - * Monitor the submitted application for reaching the requested state. - * Will also report if the app reaches a later state (failed, killed, etc) - * Kill application if duration!= null & time expires. - * Prerequisite: the applicatin was launched. - * @param desiredState desired state. - * @param duration how long to wait -must be more than 0 - * @return the application report -null on a timeout - * @throws YarnException - * @throws IOException - */ - @VisibleForTesting - public ApplicationReport monitorAppToState( - YarnApplicationState desiredState, - Duration duration) - throws YarnException, IOException { - LaunchedApplication launchedApplication = - new LaunchedApplication(applicationId, yarnClient); - return launchedApplication.monitorAppToState(desiredState, duration); - } - - @Override - public ApplicationReport getApplicationReport() throws - IOException, - YarnException { - return getApplicationReport(applicationId); - } - - @Override - public boolean forceKillApplication(String reason) - throws YarnException, IOException { - if (applicationId != null) { - new LaunchedApplication(applicationId, yarnClient).forceKill(reason); - return true; - } - return false; - } /** * List Slider instances belonging to a specific user with a specific app @@ -2721,23 +1735,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } /** - * Retrieve a list of all live instances. If clustername is supplied then it - * returns this specific cluster, if and only if it exists and is live. - * - * @param clustername - * cluster name (if looking for a specific live cluster) - * @return the list of application names which satisfies the list criteria - * @throws IOException - * @throws YarnException - */ - public Set getApplicationList(String clustername) - throws IOException, YarnException { - ActionListArgs args = new ActionListArgs(); - args.live = true; - return getApplicationList(clustername, args); - } - - /** * Retrieve a list of application instances satisfying the query criteria. * * @param clustername @@ -2757,8 +1754,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe // the above call throws an exception so the return is not really required return Collections.emptySet(); } - verifyBindingsDefined(); - boolean live = args.live; String state = args.state; boolean listContainers = args.containers; @@ -2868,29 +1863,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } } - /** - * Enumerate slider instances for the current user, and the - * most recent app report, where available. - * @param listOnlyInState boolean to indicate that the instances should - * only include those in a YARN state - * minAppState <= currentState <= maxAppState - * - * @param minAppState minimum application state to include in enumeration. - * @param maxAppState maximum application state to include - * @return a map of application instance name to description - * @throws IOException Any IO problem - * @throws YarnException YARN problems - */ - @Override - public Map enumSliderInstances( - boolean listOnlyInState, - YarnApplicationState minAppState, - YarnApplicationState maxAppState) - throws IOException, YarnException { - return yarnAppListClient.enumSliderInstances(listOnlyInState, - minAppState, - maxAppState); - } /** * Extract the state of a Yarn application --state argument @@ -2928,22 +1900,16 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe @Override @VisibleForTesting - public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException { - validateClusterName(name); - Map roleMap = args.getComponentMap(); - // throw usage exception if no changes proposed - if (roleMap.size() == 0) { - actionHelp(ACTION_FLEX); - } - verifyBindingsDefined(); - log.debug("actionFlex({})", name); - Map roleInstances = new HashMap<>(); - for (Map.Entry roleEntry : roleMap.entrySet()) { - String key = roleEntry.getKey(); - String val = roleEntry.getValue(); - roleInstances.put(key, val); - } - return flex(name, roleInstances); + public void actionFlex(String appName, ActionFlexArgs args) + throws YarnException, IOException { + Component component = new Component(); + component.setNumberOfContainers(args.getNumberOfContainers()); + if (StringUtils.isEmpty(args.getComponent())) { + component.setName("DEFAULT"); + } else { + component.setName(args.getComponent()); + } + flex(appName, component); } @Override @@ -2954,7 +1920,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException { - verifyBindingsDefined(); validateClusterName(name); boolean checkLive = args.live; log.debug("actionExists({}, {}, {})", name, checkLive, args.state); @@ -3050,14 +2015,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } /** - * Get at the service registry operations - * @return registry client -valid after the service is inited. - */ - public YarnAppListClient getYarnAppListClient() { - return yarnAppListClient; - } - - /** * Find an instance of an application belonging to the current user. * @param appname application name * @return the app report or null if none is found @@ -3128,20 +2085,20 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return EXIT_SUCCESS; } - ClusterDescription status = verifyAndGetClusterDescription(clustername); + Application application = getApplication(clustername); String outfile = statusArgs.getOutput(); if (outfile == null) { - log.info(status.toJsonString()); + log.info(application.toString()); } else { - status.save(new File(outfile).getAbsoluteFile()); + jsonSerDeser.save(application, new File(statusArgs.getOutput())); } return EXIT_SUCCESS; } @Override - public String actionStatus(String clustername) + public Application actionStatus(String clustername) throws YarnException, IOException { - return verifyAndGetClusterDescription(clustername).toJsonString(); + return getApplication(clustername); } private void queryAndPrintLifetime(String appName) @@ -3170,13 +2127,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } } - private ClusterDescription verifyAndGetClusterDescription(String clustername) - throws YarnException, IOException { - verifyBindingsDefined(); - validateClusterName(clustername); - return getClusterDescription(clustername); - } - @Override public int actionVersion() { SliderVersionInfo.loadAndPrintVersionInfo(log); @@ -3184,269 +2134,106 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } @Override - public int actionFreeze(String clustername, - ActionFreezeArgs freezeArgs) throws YarnException, IOException { - verifyBindingsDefined(); - validateClusterName(clustername); - int waittime = freezeArgs.getWaittime(); - String text = freezeArgs.message; - boolean forcekill = freezeArgs.force; - log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername, - text, - waittime, - forcekill); - - //is this actually a known cluster? - sliderFileSystem.locateInstanceDefinition(clustername); - ApplicationReport app = findInstance(clustername); + public void actionStop(String appName, ActionFreezeArgs freezeArgs) + throws YarnException, IOException { + validateClusterName(appName); + ApplicationReport app = findInstance(appName); if (app == null) { - // exit early - log.info("Cluster {} not running", clustername); - // not an error to stop a stopped cluster - return EXIT_SUCCESS; - } - log.debug("App to stop was found: {}:\n{}", clustername, - new OnDemandReportStringifier(app)); - if (app.getYarnApplicationState().ordinal() >= - YarnApplicationState.FINISHED.ordinal()) { - log.info("Cluster {} is in a terminated state {}", clustername, - app.getYarnApplicationState()); - return EXIT_SUCCESS; + throw new ApplicationNotFoundException( + "Application " + appName + " doesn't exist in RM."); } - // IPC request for a managed shutdown is only possible if the app is running. - // so we need to force kill if the app is accepted or submitted - if (!forcekill - && app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) { - log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername, + if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED + .ordinal()) { + log.info("Application {} is in a terminated state {}", appName, app.getYarnApplicationState()); - forcekill = true; - } - - LaunchedApplication application = new LaunchedApplication(yarnClient, ap --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org