Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8D604200CFC for ; Wed, 23 Aug 2017 21:32:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8BD481668FA; Wed, 23 Aug 2017 19:32:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 459B316686F for ; Wed, 23 Aug 2017 21:32:46 +0200 (CEST) Received: (qmail 10583 invoked by uid 500); 23 Aug 2017 19:32:40 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 8868 invoked by uid 99); 23 Aug 2017 19:32:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 19:32:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1880EF5EFD; Wed, 23 Aug 2017 19:32:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: billie@apache.org To: common-commits@hadoop.apache.org Date: Wed, 23 Aug 2017 19:32:57 -0000 Message-Id: <891f0978c1e841be8dd034700ad9ee3c@git.apache.org> In-Reply-To: <5e24151e1de74f54afbc6aa9bf532591@git.apache.org> References: <5e24151e1de74f54afbc6aa9bf532591@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/44] hadoop git commit: YARN-7050. Post cleanup after YARN-6903, removal of org.apache.slider package. Contributed by Jian He archived-at: Wed, 23 Aug 2017 19:32:48 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00bb2ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java deleted file mode 100644 index 06dde67..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ /dev/null @@ -1,2138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.appmaster; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.health.HealthCheckRegistry; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; -import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.registry.client.api.RegistryOperations; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; -import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; -import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.service.ServiceOperations; -import org.apache.hadoop.service.ServiceStateChangeListener; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.TimelineV2Client; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.client.api.async.NMClientAsync; -import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; -import org.apache.hadoop.yarn.service.provider.ProviderService; -import org.apache.hadoop.yarn.service.provider.ProviderFactory; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.webapp.WebAppException; -import org.apache.hadoop.yarn.webapp.WebApps; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.apache.slider.api.InternalKeys; -import org.apache.slider.api.ResourceKeys; -import org.apache.slider.api.RoleKeys; -import org.apache.slider.api.proto.Messages; -import org.apache.slider.api.proto.SliderClusterAPI; -import org.apache.slider.api.resource.Application; -import org.apache.slider.api.resource.Component; -import org.apache.hadoop.yarn.service.conf.SliderExitCodes; -import org.apache.hadoop.yarn.service.conf.SliderKeys; -import org.apache.hadoop.yarn.service.client.params.AbstractActionArgs; -import org.apache.hadoop.yarn.service.client.params.SliderAMArgs; -import org.apache.hadoop.yarn.service.client.params.SliderAMCreateAction; -import org.apache.hadoop.yarn.service.client.params.SliderActions; -import org.apache.slider.common.tools.ConfigHelper; -import org.apache.slider.common.tools.PortScanner; -import org.apache.slider.common.tools.SliderFileSystem; -import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.common.tools.SliderVersionInfo; -import org.apache.slider.core.exceptions.BadConfigException; -import org.apache.slider.core.exceptions.SliderException; -import org.apache.slider.core.exceptions.SliderInternalStateException; -import org.apache.slider.core.exceptions.TriggerClusterTeardownException; -import org.apache.slider.core.launch.CredentialUtils; -import org.apache.slider.core.main.ExitCodeProvider; -import org.apache.slider.core.main.LauncherExitCodes; -import org.apache.slider.core.main.RunService; -import org.apache.slider.core.main.ServiceLauncher; -import org.apache.slider.core.registry.info.CustomRegistryConstants; -import org.apache.slider.providers.ProviderCompleted; -import org.apache.slider.server.appmaster.actions.ActionHalt; -import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance; -import org.apache.slider.server.appmaster.actions.ActionStopSlider; -import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers; -import org.apache.slider.server.appmaster.actions.AsyncAction; -import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests; -import org.apache.slider.server.appmaster.actions.MonitorComponentInstances; -import org.apache.slider.server.appmaster.actions.QueueExecutor; -import org.apache.slider.server.appmaster.actions.QueueService; -import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; -import org.apache.slider.server.appmaster.actions.RenewingAction; -import org.apache.slider.server.appmaster.actions.ResetFailureWindow; -import org.apache.slider.server.appmaster.actions.ReviewAndFlexApplicationSize; -import org.apache.slider.server.appmaster.actions.UnregisterComponentInstance; -import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; -import org.apache.slider.server.appmaster.management.YarnServiceHealthCheck; -import org.apache.slider.server.appmaster.monkey.ChaosKillAM; -import org.apache.slider.server.appmaster.monkey.ChaosKillContainer; -import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService; -import org.apache.slider.server.appmaster.operations.AbstractRMOperation; -import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler; -import org.apache.slider.server.appmaster.operations.RMOperationHandler; -import org.apache.slider.server.appmaster.rpc.RpcBinder; -import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl; -import org.apache.slider.server.appmaster.rpc.SliderIPCService; -import org.apache.slider.server.appmaster.state.AppState; -import org.apache.slider.server.appmaster.state.AppStateBindingInfo; -import org.apache.slider.server.appmaster.state.ContainerAssignment; -import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector; -import org.apache.slider.server.appmaster.state.ProviderAppState; -import org.apache.slider.server.appmaster.state.RoleInstance; -import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; -import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink; -import org.apache.slider.server.appmaster.web.SliderAMWebApp; -import org.apache.slider.server.appmaster.web.WebAppApi; -import org.apache.slider.server.appmaster.web.WebAppApiImpl; -import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer; -import org.apache.slider.server.appmaster.web.rest.RestPaths; -import org.apache.slider.server.appmaster.web.rest.application.ApplicationResouceContentCacheFactory; -import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache; -import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; -import org.apache.slider.server.services.utility.WebAppService; -import org.apache.slider.server.services.workflow.ServiceThreadFactory; -import org.apache.slider.server.services.workflow.WorkflowExecutorService; -import org.apache.slider.server.services.workflow.WorkflowRpcService; -import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URL; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; -import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG; - -/** - * This is the AM, which directly implements the callbacks from the AM and NM - */ -public class SliderAppMaster extends AbstractSliderLaunchedService - implements AMRMClientAsync.CallbackHandler, - NMClientAsync.CallbackHandler, - RunService, - SliderExitCodes, - SliderKeys, - ServiceStateChangeListener, - RoleKeys, - ProviderCompleted, - AppMasterActionOperations { - - protected static final Logger log = - LoggerFactory.getLogger(SliderAppMaster.class); - - /** - * log for YARN events - */ - protected static final Logger LOG_YARN = log; - - public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster"; - public static final String SERVICE_CLASSNAME = - "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT; - - public static final int HEARTBEAT_INTERVAL = 1000; - public static final int NUM_RPC_HANDLERS = 5; - - /** - * Metrics and monitoring services. - * Deployed in {@link #serviceInit(Configuration)} - */ - private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring(); - - /** - * metrics registry - */ - public MetricRegistry metrics; - - /** Error string on chaos monkey launch failure action: {@value} */ - public static final String E_TRIGGERED_LAUNCH_FAILURE = - "Chaos monkey triggered launch failure"; - - /** YARN RPC to communicate with the Resource Manager or Node Manager */ - private YarnRPC yarnRPC; - - /** Handle to communicate with the Resource Manager*/ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private AMRMClientAsync asyncRMClient; - - /** Handle to communicate with the timeline service */ - private TimelineV2Client timelineClient; - - private boolean timelineServiceEnabled = false; - - ServiceTimelinePublisher serviceTimelinePublisher; - - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RMOperationHandler rmOperationHandler; - - /** Handle to communicate with the Node Manager*/ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - public NMClientAsync nmClientAsync; - - /** - * Credentials for propagating down to launched containers - */ - private Credentials containerCredentials = new Credentials(); - - /** - * Slider IPC: Real service handler - */ - private SliderIPCService sliderIPCService; - /** - * Slider IPC: binding - */ - private WorkflowRpcService rpcService; - - /** - * Secret manager - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private ClientToAMTokenSecretManager secretManager; - - /** Hostname of the container*/ - private String appMasterHostname = ""; - /* Port on which the app master listens for status updates from clients*/ - private int appMasterRpcPort = 0; - /** Tracking url to which app master publishes info for clients to monitor*/ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private String appMasterTrackingUrl = ""; - - /** Proxied app master URL (as retrieved from AM report at launch time) */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private String appMasterProxiedUrl = ""; - - /** Application Attempt Id ( combination of attemptId and fail count )*/ - private ApplicationAttemptId appAttemptID; - - /** - * App ACLs - */ - protected Map applicationACLs; - - /** - * Ongoing state of the cluster: containers, nodes they - * live on, etc. - */ - private final AppState appState = - new AppState(new ProtobufClusterServices(), metricsAndMonitoring); - - /** - * App state for external objects. This is almost entirely - * a read-only view of the application state. To change the state, - * Providers (or anything else) are expected to queue async changes. - */ - private final ProviderAppState stateForProviders = - new ProviderAppState("undefined", appState); - - /** - * model the state using locks and conditions - */ - private final ReentrantLock AMExecutionStateLock = new ReentrantLock(); - private final Condition isAMCompleted = AMExecutionStateLock.newCondition(); - - /** - * Flag set if the AM is to be shutdown - */ - private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false); - - /** - * Flag set during the init process - */ - private final AtomicBoolean initCompleted = new AtomicBoolean(false); - - /** Arguments passed in : raw*/ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private SliderAMArgs serviceArgs; - - /** - * ID of the AM container - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private ContainerId appMasterContainerID; - - /** - * Monkey Service -may be null - */ - private ChaosMonkeyService monkey; - - /** - * ProviderService of this cluster - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private Set providers = new HashSet<>(); - - /** - * The YARN registry service - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RegistryOperations registryOperations; - - /** - * The stop request received...the exit details are extracted - * from this - */ - private volatile ActionStopSlider stopAction; - - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RoleLaunchService launchService; - - //username -null if it is not known/not to be set - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private String hadoop_user_name; - private String service_user_name; - - private SliderAMWebApp webApp; - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private InetSocketAddress rpcServiceAddress; - - /** - * Executor. - * Assigned in {@link #serviceInit(Configuration)} - */ - private WorkflowExecutorService executorService; - - /** - * Action queues. Created at instance creation, but - * added as a child and inited in {@link #serviceInit(Configuration)} - */ - private final QueueService actionQueues = new QueueService(); - private YarnRegistryViewForProviders yarnRegistryOperations; - //private FsDelegationTokenManager fsDelegationTokenManager; - private RegisterApplicationMasterResponse amRegistrationData; - private PortScanner portScanner; - - /** - * Is security enabled? - * Set early on in the {@link #createAndRunCluster(String)} operation. - */ - private boolean securityEnabled; - private ContentCache contentCache; - - /** - * resource limits - */ - private Resource maximumResourceCapability; - private Application application; - /** - * Service Constructor - */ - public SliderAppMaster() { - super(SERVICE_CLASSNAME_SHORT); - new HdfsConfiguration(); - new YarnConfiguration(); - } - -/* =================================================================== */ -/* service lifecycle methods */ -/* =================================================================== */ - - @Override //AbstractService - public synchronized void serviceInit(Configuration conf) throws Exception { - // slider client if found - - Configuration customConf = SliderUtils.loadSliderClientXML(); - // Load in the server configuration - if it is actually on the Classpath - URL serverXmlUrl = ConfigHelper.getResourceUrl(SLIDER_SERVER_XML); - if (serverXmlUrl != null) { - log.info("Loading {} at {}", SLIDER_SERVER_XML, serverXmlUrl); - Configuration serverConf = ConfigHelper.loadFromResource(SLIDER_SERVER_XML); - ConfigHelper.mergeConfigurations(customConf, serverConf, - SLIDER_SERVER_XML, true); - } - serviceArgs.applyDefinitions(customConf); - serviceArgs.applyFileSystemBinding(customConf); - // conf now contains all customizations - - AbstractActionArgs action = serviceArgs.getCoreAction(); - SliderAMCreateAction createAction = (SliderAMCreateAction) action; - - // sort out the location of the AM - String rmAddress = createAction.getRmAddress(); - if (rmAddress != null) { - log.debug("Setting RM address from the command line: {}", rmAddress); - SliderUtils.setRmSchedulerAddress(customConf, rmAddress); - } - - log.info("AM configuration:\n{}", - ConfigHelper.dumpConfigToString(customConf)); - for (Map.Entry envs : System.getenv().entrySet()) { - log.info("System env {}={}", envs.getKey(), envs.getValue()); - } - - ConfigHelper.mergeConfigurations(conf, customConf, SLIDER_CLIENT_XML, true); - //init security with our conf - if (SliderUtils.isHadoopClusterSecure(conf)) { - log.info("Secure mode with kerberos realm {}", - SliderUtils.getKerberosRealm()); - UserGroupInformation.setConfiguration(conf); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - log.debug("Authenticating as {}", ugi); - SliderUtils.verifyPrincipalSet(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); - } else { - log.info("Cluster is insecure"); - } - log.info("Login user is {}", UserGroupInformation.getLoginUser()); - - //look at settings of Hadoop Auth, to pick up a problem seen once - checkAndWarnForAuthTokenProblems(); - - // validate server env - boolean dependencyChecks = - !conf.getBoolean(KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED, - false); - SliderUtils.validateSliderServerEnvironment(log, dependencyChecks); - - // create and register monitoring services - addService(metricsAndMonitoring); - metrics = metricsAndMonitoring.getMetrics(); -/* TODO: turn these one once the metrics testing is more under control - metrics.registerAll(new ThreadStatesGaugeSet()); - metrics.registerAll(new MemoryUsageGaugeSet()); - metrics.registerAll(new GarbageCollectorMetricSet()); - -*/ - contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders); - - executorService = new WorkflowExecutorService<>("AmExecutor", - Executors.newFixedThreadPool(2, - new ServiceThreadFactory("AmExecutor", true))); - addService(executorService); - - addService(actionQueues); - if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - timelineServiceEnabled = true; - log.info("Enabled YARN timeline service v2. "); - } - - //init all child services - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - HealthCheckRegistry health = metricsAndMonitoring.getHealth(); - health.register("AM Health", new YarnServiceHealthCheck(this)); - } - - /** - * Start the queue processing - */ - private void startQueueProcessing() { - log.info("Queue Processing started"); - executorService.execute(actionQueues); - executorService.execute(new QueueExecutor(this, actionQueues)); - } - -/* =================================================================== */ -/* RunService methods called from ServiceLauncher */ -/* =================================================================== */ - - /** - * pick up the args from the service launcher - * @param config configuration - * @param args argument list - */ - @Override // RunService - public Configuration bindArgs(Configuration config, String... args) throws Exception { - // let the superclass process it - Configuration superConf = super.bindArgs(config, args); - - //yarn-ify - YarnConfiguration yarnConfiguration = new YarnConfiguration( - superConf); - serviceArgs = new SliderAMArgs(args); - serviceArgs.parse(); - - return SliderUtils.patchConfiguration(yarnConfiguration); - } - - - /** - * this is called by service launcher; when it returns the application finishes - * @return the exit code to return by the app - * @throws Throwable - */ - @Override - public int runService() throws Throwable { - SliderVersionInfo.loadAndPrintVersionInfo(log); - - //dump the system properties if in debug mode - if (log.isDebugEnabled()) { - log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties())); - } - - //choose the action - String action = serviceArgs.getAction(); - List actionArgs = serviceArgs.getActionArgs(); - int exitCode; - switch (action) { - case SliderActions.ACTION_HELP: - log.info("{}: {}", getName(), serviceArgs.usage()); - exitCode = SliderExitCodes.EXIT_USAGE; - break; - case SliderActions.ACTION_CREATE: - exitCode = createAndRunCluster(actionArgs.get(0)); - break; - default: - throw new SliderException("Unimplemented: " + action); - } - log.info("Exiting AM; final exit code = {}", exitCode); - return exitCode; - } - - /** - * Initialize a newly created service then add it. - * Because the service is not started, this MUST be done before - * the AM itself starts, or it is explicitly added after - * @param service the service to init - */ - public Service initAndAddService(Service service) { - service.init(getConfig()); - addService(service); - return service; - } - - /* =================================================================== */ - - /** - * Create and run the cluster. - * @param appName cluster name - * @return exit code - * @throws Throwable on a failure - */ - private int createAndRunCluster(String appName) throws Throwable { - Path appDir = new Path((serviceArgs.getAppDefPath())); - SliderFileSystem fs = getClusterFS(); - fs.setAppDir(appDir); - application = ServiceApiUtil.loadApplication(fs, appName); - log.info("Application Json: " + application); - stateForProviders.setApplicationName(appName); - Configuration serviceConf = getConfig(); - - // obtain security state - // set the global security flag for the instance definition - - // initialize our providers - for (Component component : application.getComponents()) { - ProviderFactory factory = ProviderFactory - .createSliderProviderFactory(component.getArtifact()); - ProviderService providerService = factory.createServerProvider(); - // init the provider BUT DO NOT START IT YET -// initAndAddService(providerService); - providers.add(providerService); - } - - InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf); - log.info("RM is at {}", rmSchedulerAddress); - yarnRPC = YarnRPC.create(serviceConf); - - // set up the YARN client. This may require patching in the RM client-API address if it - // is (somehow) unset server-side. String clientRMaddr = serviceConf.get(YarnConfiguration.RM_ADDRESS); - InetSocketAddress clientRpcAddress = SliderUtils.getRmAddress(serviceConf); - if (!SliderUtils.isAddressDefined(clientRpcAddress)) { - // client addr is being unset. We can lift it from the other RM APIs - log.warn("Yarn RM address was unbound; attempting to fix up"); - serviceConf.set(YarnConfiguration.RM_ADDRESS, - String.format("%s:%d", rmSchedulerAddress.getHostString(), clientRpcAddress.getPort() )); - } - - /* - * Extract the container ID. This is then - * turned into an (incomplete) container - */ - appMasterContainerID = ConverterUtils.toContainerId( - SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name())); - appAttemptID = appMasterContainerID.getApplicationAttemptId(); - - ApplicationId appid = appAttemptID.getApplicationId(); - log.info("AM for ID {}", appid.getId()); - - Map envVars; - List liveContainers; - - /* - * It is critical this section is synchronized, to stop async AM events - * arriving while registering a restarting AM. - */ - synchronized (appState) { - int heartbeatInterval = HEARTBEAT_INTERVAL; - - // configure AM to wait forever for RM - getConfig().setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, - -1); - getConfig().unset(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS); - - // add the RM client -this brings the callbacks in - asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this); - addService(asyncRMClient); - //now bring it up - deployChildService(asyncRMClient); - - if (timelineServiceEnabled) { - timelineClient = TimelineV2Client.createTimelineClient(appid); - asyncRMClient.registerTimelineV2Client(timelineClient); - timelineClient.init(getConfig()); - timelineClient.start(); - log.info("Timeline v2 client started."); - - serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient); - serviceTimelinePublisher.init(getConfig()); - serviceTimelinePublisher.start(); - - for (ProviderService providerService : providers) { - } - appState.setServiceTimelinePublisher(serviceTimelinePublisher); - log.info("ServiceTimelinePublisher started."); - } - - - // nmclient relays callbacks back to this class - nmClientAsync = new NMClientAsyncImpl("nmclient", this); - deployChildService(nmClientAsync); - - // set up secret manager - secretManager = new ClientToAMTokenSecretManager(appAttemptID, null); - - if (securityEnabled) { - // fix up the ACLs if they are not set - String acls = serviceConf.get(KEY_PROTOCOL_ACL); - if (acls == null) { - getConfig().set(KEY_PROTOCOL_ACL, "*"); - } - } - - //bring up the Slider RPC service - buildPortScanner(); - startSliderRPCServer(); - - rpcServiceAddress = rpcService.getConnectAddress(); - appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName(); - appMasterRpcPort = rpcServiceAddress.getPort(); - appMasterTrackingUrl = null; - log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort); - - log.info("Starting Yarn registry"); - registryOperations = startRegistryOperationsService(); - log.info(registryOperations.toString()); - - // Start up the WebApp and track the URL for it - // Web service endpoints: initialize - WebAppApiImpl webAppApi = - new WebAppApiImpl( - stateForProviders, - registryOperations, - metricsAndMonitoring, - actionQueues); - initAMFilterOptions(serviceConf); - - int webAppPort = deployWebApplication(webAppApi); - - String scheme = WebAppUtils.HTTP_PREFIX; - appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort; - - // ***************************************************** - // Register self with ResourceManager - // This will start heartbeating to the RM - // address = SliderUtils.getRmSchedulerAddress(asyncRMClient.getConfig()); - // ***************************************************** - log.info("Connecting to RM at {}; AM tracking URL={}", - appMasterRpcPort, appMasterTrackingUrl); - amRegistrationData = asyncRMClient.registerApplicationMaster(appMasterHostname, - appMasterRpcPort, - appMasterTrackingUrl); - maximumResourceCapability = amRegistrationData.getMaximumResourceCapability(); - - //TODO should not read local configs !!! - int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - // validate scheduler vcores allocation setting - int minCores = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - int maxMemory = maximumResourceCapability.getMemory(); - int maxCores = maximumResourceCapability.getVirtualCores(); - appState.setContainerLimits(minMemory,maxMemory, minCores, maxCores ); - - // build the handler for RM request/release operations; this uses - // the max value as part of its lookup - rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability); - - stripAMRMToken(); - -// if (securityEnabled) { -// secretManager.setMasterKey( -// amRegistrationData.getClientToAMTokenMasterKey().array()); -// applicationACLs = amRegistrationData.getApplicationACLs(); -// -// //tell the server what the ACLs are -// rpcService.getServer().refreshServiceAcl(serviceConf, -// new SliderAMPolicyProvider()); -// if (securityConfiguration.isKeytabProvided()) { -// // perform keytab based login to establish kerberos authenticated -// // principal. Can do so now since AM registration with RM above required -// // tokens associated to principal -// String principal = securityConfiguration.getPrincipal(); -// //TODO read key tab file from slider-am.xml -// File localKeytabFile = new File("todo"); -//// securityConfiguration.getKeytabFile(new AggregateConf()); -// // Now log in... -// login(principal, localKeytabFile); -// // obtain new FS reference that should be kerberos based and different -// // than the previously cached reference -// fs = new SliderFileSystem(serviceConf); -// } -// } - - // YARN client. - // Important: this is only valid at startup, and must be executed within - // the right UGI context. Use with care. - YarnClient yarnClient = null; - List nodeReports; - try { - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(getConfig()); - yarnClient.start(); - nodeReports = getNodeReports(yarnClient); - log.info("Yarn node report count: {}", nodeReports.size()); - // look up the application itself -this is needed to get the proxied - // URL of the AM, for registering endpoints. - // this call must be made after the AM has registered itself, obviously - ApplicationAttemptReport report = getApplicationAttemptReport(yarnClient); - appMasterProxiedUrl = report.getTrackingUrl(); - if (SliderUtils.isUnset(appMasterProxiedUrl)) { - log.warn("Proxied URL is not set in application report"); - appMasterProxiedUrl = appMasterTrackingUrl; - } - } finally { - // at this point yarnClient is no longer needed. - // stop it immediately - ServiceOperations.stop(yarnClient); - yarnClient = null; - } - - // extract container list - - liveContainers = amRegistrationData.getContainersFromPreviousAttempts(); - DefaultMetricsSystem.initialize("SliderAppMaster"); - if (timelineServiceEnabled) { - DefaultMetricsSystem.instance().register("ServiceMetricsSink", - "For processing metrics to ATS", - new ServiceMetricsSink(serviceTimelinePublisher)); - log.info("ServiceMetricsSink registered."); - } - - //determine the location for the role history data - Path historyDir = new Path(appDir, HISTORY_DIR_NAME); - - //build the instance - AppStateBindingInfo binding = new AppStateBindingInfo(); - binding.serviceConfig = null; - binding.fs = fs.getFileSystem(); - binding.historyPath = historyDir; - binding.liveContainers = liveContainers; - binding.releaseSelector = new MostRecentContainerReleaseSelector(); - binding.nodeReports = nodeReports; - binding.application = application; - binding.serviceHdfsDir = new Path(fs.buildClusterDirPath(appName), - SliderKeys.DATA_DIR_NAME).toString(); - appState.buildInstance(binding); - - // build up environment variables that the AM wants set in every container - // irrespective of provider and role. - envVars = new HashMap<>(); - if (hadoop_user_name != null) { - envVars.put(HADOOP_USER_NAME, hadoop_user_name); - } - String debug_kerberos = System.getenv(HADOOP_JAAS_DEBUG); - if (debug_kerberos != null) { - envVars.put(HADOOP_JAAS_DEBUG, debug_kerberos); - } - } - String rolesTmpSubdir = appMasterContainerID.toString() + "/roles"; - - String amTmpDir = "/tmp"; - //TODO read tmpDir from slider-am.xml - Path tmpDirPath = new Path(amTmpDir); - Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir); - fs.getFileSystem().mkdirs(launcherTmpDirPath); - - //launcher service - launchService = new RoleLaunchService(actionQueues, - fs, envVars); - - deployChildService(launchService); - - //Give the provider access to the state, and AM - for (ProviderService providerService : providers) { -// providerService.setAMState(stateForProviders); - } - - // chaos monkey - maybeStartMonkey(); - - // if not a secure cluster, extract the username -it will be - // propagated to workers - if (!UserGroupInformation.isSecurityEnabled()) { - hadoop_user_name = System.getenv(HADOOP_USER_NAME); - log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name); - } - service_user_name = RegistryUtils.currentUser(); - log.info("Registry service username ={}", service_user_name); - - - // declare the cluster initialized - log.info("Application Master Initialization Completed"); - initCompleted.set(true); - - scheduleFailureWindowResets(application.getConfiguration()); - scheduleEscalation(application.getConfiguration()); - scheduleMonitoring(application.getConfiguration()); - - try { - // schedule YARN Registry registration - queue(new ActionRegisterServiceInstance(appName, appid, application)); - - // log the YARN and web UIs - log.info("RM Webapp address {}", - serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS)); - log.info("Slider webapp address {} proxied at {}", - appMasterTrackingUrl, appMasterProxiedUrl); - // launch the real provider; this is expected to trigger a callback that - // starts the node review process - launchProviderService(); - - // start handling any scheduled events - - startQueueProcessing(); - - //now block waiting to be told to exit the process - waitForAMCompletionSignal(); - } catch(Exception e) { - log.error("Exception : {}", e, e); - // call the AM stop command as if it had been queued (but without - // going via the queue, which may not have started - onAMStop(new ActionStopSlider(e)); - } - //shutdown time - return finish(); - } - - /** - * Get the YARN application Attempt report as the logged in user - * @param yarnClient client to the RM - * @return the application report - * @throws YarnException - * @throws IOException - * @throws InterruptedException - */ - private ApplicationAttemptReport getApplicationAttemptReport( - final YarnClient yarnClient) - throws YarnException, IOException, InterruptedException { - Preconditions.checkNotNull(yarnClient, "Null Yarn client"); - ApplicationAttemptReport report; - if (securityEnabled) { - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - report = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationAttemptReport run() throws Exception { - return yarnClient.getApplicationAttemptReport(appAttemptID); - } - }); - } else { - report = yarnClient.getApplicationAttemptReport(appAttemptID); - } - return report; - } - - /** - * List the node reports: uses {@link YarnClient} as the login user - * @param yarnClient client to the RM - * @return the node reports - * @throws IOException - * @throws YarnException - * @throws InterruptedException - */ - private List getNodeReports(final YarnClient yarnClient) - throws IOException, YarnException, InterruptedException { - Preconditions.checkNotNull(yarnClient, "Null Yarn client"); - List nodeReports; - if (securityEnabled) { - nodeReports = UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction>() { - @Override - public List run() throws Exception { - return yarnClient.getNodeReports(NodeState.RUNNING); - } - }); - } else { - nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); - } - log.info("Yarn node report count: {}", nodeReports.size()); - return nodeReports; - } - - /** - * Deploy the web application. - *

- * Creates and starts the web application, and adds a - * WebAppService service under the AM, to ensure - * a managed web application shutdown. - * @param webAppApi web application API instance - * @return port the web application is deployed on - * @throws IOException general problems starting the webapp (network, etc) - * @throws WebAppException other issues - */ - private int deployWebApplication(WebAppApiImpl webAppApi) - throws IOException, SliderException { - - try { - webApp = new SliderAMWebApp(webAppApi); - HttpConfig.Policy policy = HttpConfig.Policy.HTTP_ONLY; - int port = getPortToRequest(); - log.info("Launching web application at port {} with policy {}", port, policy); - - WebApps.$for(SliderAMWebApp.BASE_PATH, - WebAppApi.class, - webAppApi, - RestPaths.WS_CONTEXT) - .withHttpPolicy(getConfig(), policy) - .at("0.0.0.0", port, true) - .inDevMode() - .start(webApp); - - WebAppService webAppService = - new WebAppService<>("slider", webApp); - - deployChildService(webAppService); - return webApp.port(); - } catch (WebAppException e) { - if (e.getCause() instanceof IOException) { - throw (IOException)e.getCause(); - } else { - throw e; - } - } - } - - /** - * Process the initial user to obtain the set of user - * supplied credentials (tokens were passed in by client). - * Removes the AM/RM token. - * @throws IOException - */ - private void stripAMRMToken() - throws IOException { - List filteredTokens = new ArrayList<>(3); - filteredTokens.add(AMRMTokenIdentifier.KIND_NAME); - containerCredentials = CredentialUtils.filterTokens( - UserGroupInformation.getCurrentUser().getCredentials(), - filteredTokens); - log.info(CredentialUtils.dumpTokens(containerCredentials, "\n")); - } - - /** - * Build up the port scanner. This may include setting a port range. - */ - private void buildPortScanner() - throws BadConfigException { - portScanner = new PortScanner(); - String portRange = "0"; - //TODO read from slider-am.xml -// String portRange = instanceDefinition. -// getAppConfOperations().getGlobalOptions(). -// getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0"); - if (!"0".equals(portRange)) { - portScanner.setPortRange(portRange); - } - } - - /** - * Locate a port to request for a service such as RPC or web/REST. - * This uses port range definitions in the instanceDefinition - * to fix the port range —if one is set. - *

- * The port returned is available at the time of the request; there are - * no guarantees as to how long that situation will last. - * @return the port to request. - * @throws SliderException - */ - private int getPortToRequest() throws SliderException, IOException { - return portScanner.getAvailablePort(); - } - - protected void login(String principal, File localKeytabFile) - throws IOException, SliderException { - log.info("Logging in as {} with keytab {}", principal, localKeytabFile); - UserGroupInformation.loginUserFromKeytab(principal, - localKeytabFile.getAbsolutePath()); - validateLoginUser(UserGroupInformation.getLoginUser()); - } - - /** - * Ensure that the user is generated from a keytab and has no HDFS delegation - * tokens. - * - * @param user user to validate - * @throws SliderException - */ - protected void validateLoginUser(UserGroupInformation user) - throws SliderException { - if (!user.isFromKeytab()) { - log.error("User is not holding on a keytab in a secure deployment:" + - " slider will fail as tokens expire"); - } - Credentials credentials = user.getCredentials(); - Iterator> iter = - credentials.getAllTokens().iterator(); - while (iter.hasNext()) { - Token token = iter.next(); - log.info("Token {}", token.getKind()); - if (token.getKind().equals( - DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) { - log.info("HDFS delegation token {}. Removing...", token); - iter.remove(); - } - } - } - - /** - * Set up the AM filter - * @param serviceConf configuration to patch - */ - private void initAMFilterOptions(Configuration serviceConf) { - // IP filtering - String amFilterName = AM_FILTER_NAME; - - // This is here until YARN supports proxy & redirect operations - // on verbs other than GET, and is only supported for testing - if (X_DEV_INSECURE_REQUIRED && serviceConf.getBoolean(X_DEV_INSECURE_WS, - X_DEV_INSECURE_DEFAULT)) { - log.warn("Insecure filter enabled: REST operations are unauthenticated"); - amFilterName = InsecureAmFilterInitializer.NAME; - } - - serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS, amFilterName); - } - - /** - * This registers the service instance and its external values - * @param instanceName name of this instance - * @param appId application ID - * @throws IOException - */ - public void registerServiceInstance(String instanceName, - ApplicationId appId, Application application) throws IOException { - - //Give the provider restricted access to the state, registry - setupInitialRegistryPaths(); - yarnRegistryOperations = new YarnRegistryViewForProviders( - registryOperations, - service_user_name, - SliderKeys.APP_TYPE, - instanceName, - appAttemptID); - for (ProviderService providerService : providers) { -// providerService.bindToYarnRegistry(yarnRegistryOperations); - } - - // Yarn registry - ServiceRecord serviceRecord = new ServiceRecord(); - serviceRecord.set(YarnRegistryAttributes.YARN_ID, appId.toString()); - serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE, - PersistencePolicies.APPLICATION); - serviceRecord.description = "Slider Application Master"; - - serviceRecord.addExternalEndpoint( - RegistryTypeUtils.ipcEndpoint( - CustomRegistryConstants.AM_IPC_PROTOCOL, - rpcServiceAddress)); - - // set any provided attributes - setUserProvidedServiceRecordAttributes(application.getConfiguration(), - serviceRecord); - - // register the service's entry - log.info("Service Record \n{}", serviceRecord); - yarnRegistryOperations.registerSelf(serviceRecord, true); - log.info("Registered service under {}; absolute path {}", - yarnRegistryOperations.getSelfRegistrationPath(), - yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); - - boolean isFirstAttempt = 1 == appAttemptID.getAttemptId(); - // delete the children in case there are any and this is an AM startup. - // just to make sure everything underneath is purged - if (isFirstAttempt) { - yarnRegistryOperations.deleteChildren( - yarnRegistryOperations.getSelfRegistrationPath(), - true); - } - if (timelineServiceEnabled) { - serviceTimelinePublisher.serviceAttemptRegistered(application); - } - } - - /** - * TODO: purge this once RM is doing the work - * @throws IOException - */ - protected void setupInitialRegistryPaths() throws IOException { - if (registryOperations instanceof RMRegistryOperationsService) { - RMRegistryOperationsService rmRegOperations = - (RMRegistryOperationsService) registryOperations; - rmRegOperations.initUserRegistryAsync(service_user_name); - } - } - - /** - * Handler for {@link RegisterComponentInstance action} - * Register/re-register an ephemeral container that is already in the application state - * @param id the component - * @return true if the component is registered - */ - public boolean registerComponent(ContainerId id, RoleInstance roleInstance) - throws IOException { - RoleInstance instance = appState.getOwnedContainer(id); - if (instance == null) { - return false; - } - // this is where component registrations go - log.info("Registering component " + roleInstance.getCompInstanceName() - + ", containerId = " + id); - org.apache.slider.api.resource.Container container = - new org.apache.slider.api.resource.Container(); - container.setId(id.toString()); - container.setLaunchTime(new Date()); - container.setState(org.apache.slider.api.resource.ContainerState.RUNNING_BUT_UNREADY); - container.setBareHost(instance.host); - // TODO differentiate component name and component instance name ? - container.setComponentName(roleInstance.getCompInstanceName()); - instance.providerRole.component.addContainer(container); - - if (timelineServiceEnabled) { - serviceTimelinePublisher.componentInstanceStarted(container, null); - } - return true; - } - - protected void setUserProvidedServiceRecordAttributes( - org.apache.slider.api.resource.Configuration conf, ServiceRecord record) { - String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX; - for (Map.Entry entry : conf.getProperties().entrySet()) { - if (entry.getKey().startsWith(prefix)) { - String key = entry.getKey().substring(prefix.length() + 1); - record.set(key, entry.getValue().trim()); - } - } - } - - /** - * Handler for {@link UnregisterComponentInstance} - * - * unregister a component. At the time this message is received, - * the component may not have been registered - */ - public void unregisterComponent(RoleInstance roleInstance) { - ContainerId containerId = roleInstance.getContainerId(); - log.info( - "Unregistering component instance " + roleInstance.getCompInstanceName() - + ", ContainerId = " + containerId); - if (yarnRegistryOperations == null) { - log.warn("Processing unregister component event before initialization " - + "completed; init flag ={}", initCompleted); - return; - } - String cid = RegistryPathUtils.encodeYarnID(containerId.toString()); -// try { -// yarnRegistryOperations.deleteComponent(cid); -// } catch (IOException e) { -// log.warn("Failed to delete container {} : {}", containerId, e, e); -// } - - // remove component instance dir - try { - FileSystem fs = getClusterFS().getFileSystem(); - if (roleInstance.compInstanceDir != null && fs - .exists(roleInstance.compInstanceDir)) { - boolean deleted = fs.delete(roleInstance.compInstanceDir, true); - if (!deleted) { - log.warn("Failed to delete component instance dir: " - + roleInstance.compInstanceDir); - } - } - } catch (IOException e) { - log.error("Failed to delete component instance dir: " - + roleInstance.compInstanceDir, e); - } - } - - /** - * looks for a specific case where a token file is provided as an environment - * variable, yet the file is not there. - * - * This surfaced (once) in HBase, where its HDFS library was looking for this, - * and somehow the token was missing. This is a check in the AM so that - * if the problem re-occurs, the AM can fail with a more meaningful message. - * - */ - private void checkAndWarnForAuthTokenProblems() { - String fileLocation = - System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - File tokenFile = new File(fileLocation); - if (!tokenFile.exists()) { - log.warn("Token file {} specified in {} not found", tokenFile, - UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - } - } - } - - /** - * Get the filesystem of this cluster - * @return the FS of the config - */ - public SliderFileSystem getClusterFS() throws IOException { - return new SliderFileSystem(getConfig()); - } - - /** - * Get the AM log - * @return the log of the AM - */ - public static Logger getLog() { - return log; - } - - /** - * Get the application state - * @return the application state - */ - public AppState getAppState() { - return appState; - } - - /** - * Block until it is signalled that the AM is done - */ - private void waitForAMCompletionSignal() { - AMExecutionStateLock.lock(); - try { - if (!amCompletionFlag.get()) { - log.debug("blocking until signalled to terminate"); - isAMCompleted.awaitUninterruptibly(); - } - } finally { - AMExecutionStateLock.unlock(); - } - } - - /** - * Signal that the AM is complete .. queues it in a separate thread - * - * @param stopActionRequest request containing shutdown details - */ - public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) { - // this is a queued action: schedule it through the queues - schedule(stopActionRequest); - } - - /** - * Signal that the AM is complete - * - * @param stopActionRequest request containing shutdown details - */ - public synchronized void onAMStop(ActionStopSlider stopActionRequest) { - - AMExecutionStateLock.lock(); - try { - if (amCompletionFlag.compareAndSet(false, true)) { - // first stop request received - this.stopAction = stopActionRequest; - isAMCompleted.signal(); - } - } finally { - AMExecutionStateLock.unlock(); - } - } - - - /** - * trigger the YARN cluster termination process - * @return the exit code - * @throws Exception if the stop action contained an Exception which implements - * ExitCodeProvider - */ - private synchronized int finish() throws Exception { - Preconditions.checkNotNull(stopAction, "null stop action"); - FinalApplicationStatus appStatus; - log.info("Triggering shutdown of the AM: {}", stopAction); - - String appMessage = stopAction.getMessage(); - //stop the daemon & grab its exit code - int exitCode = stopAction.getExitCode(); - Exception exception = stopAction.getEx(); - - appStatus = stopAction.getFinalApplicationStatus(); - - // make sure the AM is actually registered. If not, there's no point - // trying to unregister it - if (amRegistrationData == null) { - log.info("Application attempt not yet registered; skipping unregistration"); - if (exception != null) { - throw exception; - } - return exitCode; - } - - //stop any launches in progress - launchService.stop(); - - //now release all containers - releaseAllContainers(application); - DefaultMetricsSystem.shutdown(); - - if (timelineServiceEnabled) { - serviceTimelinePublisher.serviceAttemptUnregistered(appState, stopAction); - serviceTimelinePublisher.stop(); - timelineClient.stop(); - } - - // When the application completes, it should send a finish application - // signal to the RM - log.info("Application completed. Signalling finish to RM"); - - try { - log.info("Unregistering AM status={} message={}", appStatus, appMessage); - asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null); - } catch (InvalidApplicationMasterRequestException e) { - log.info("Application not found in YARN application list;" + - " it may have been terminated/YARN shutdown in progress: {}", e, e); - } catch (YarnException | IOException e) { - log.info("Failed to unregister application: " + e, e); - } - if (exception != null) { - throw exception; - } - return exitCode; - } - - - public Object getProxy(Class protocol, InetSocketAddress addr) { - return yarnRPC.getProxy(protocol, addr, getConfig()); - } - - /** - * Start the slider RPC server - */ - private void startSliderRPCServer() - throws IOException, SliderException { - verifyIPCAccess(); - - sliderIPCService = new SliderIPCService( - this, stateForProviders, - actionQueues, - metricsAndMonitoring, - contentCache); - - deployChildService(sliderIPCService); - SliderClusterProtocolPBImpl protobufRelay = - new SliderClusterProtocolPBImpl(sliderIPCService); - BlockingService blockingService = SliderClusterAPI.SliderClusterProtocolPB - .newReflectiveBlockingService( - protobufRelay); - - int port = getPortToRequest(); - InetSocketAddress rpcAddress = new InetSocketAddress("0.0.0.0", port); - rpcService = - new WorkflowRpcService("SliderRPC", - RpcBinder.createProtobufServer(rpcAddress, getConfig(), - secretManager, - NUM_RPC_HANDLERS, - blockingService, - null)); - deployChildService(rpcService); - } - - /** - * verify that if the cluster is authed, the ACLs are set. - * @throws BadConfigException if Authorization is set without any ACL - */ - private void verifyIPCAccess() throws BadConfigException { - boolean authorization = getConfig().getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false); - String acls = getConfig().get(KEY_PROTOCOL_ACL); - if (authorization && SliderUtils.isUnset(acls)) { - throw new BadConfigException("Application has IPC authorization enabled in " + - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + - " but no ACLs in " + KEY_PROTOCOL_ACL); - } - } - - -/* =================================================================== */ -/* AMRMClientAsync callbacks */ -/* =================================================================== */ - - /** - * Callback event when a container is allocated. - * - * The app state is updated with the allocation, and builds up a list - * of assignments and RM operations. The assignments are - * handed off into the pool of service launchers to asynchronously schedule - * container launch operations. - * - * The operations are run in sequence; they are expected to be 0 or more - * release operations (to handle over-allocations) - * - * @param allocatedContainers list of containers that are now ready to be - * given work. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - @Override //AMRMClientAsync - public void onContainersAllocated(List allocatedContainers) { - LOG_YARN.info("onContainersAllocated({})", allocatedContainers.size()); - List assignments = new ArrayList<>(); - List operations = new ArrayList<>(); - - //app state makes all the decisions - appState.onContainersAllocated(allocatedContainers, assignments, operations); - - //for each assignment: instantiate that role - for (ContainerAssignment assignment : assignments) { - //TODO Do we need to pass credentials to containers? - launchService.launchRole(assignment, application, null); - } - - //for all the operations, exec them - execute(operations); - } - - @Override //AMRMClientAsync - public synchronized void onContainersCompleted(List completedContainers) { - LOG_YARN.info("onContainersCompleted([{}]", completedContainers.size()); - for (ContainerStatus status : completedContainers) { - ContainerId containerId = status.getContainerId(); - LOG_YARN.info("Container Completion for" + - " containerID={}," + - " state={}," + - " exitStatus={}," + - " diagnostics={}", - containerId, status.getState(), - status.getExitStatus(), - status.getDiagnostics()); - - // non complete containers should not be here - assert (status.getState() == ContainerState.COMPLETE); - AppState.NodeCompletionResult result = appState.onCompletedContainer(status); - if (result.containerFailed) { - RoleInstance ri = result.roleInstance; - log.error("Role instance {} failed ", ri); - } - - // known nodes trigger notifications - if(!result.unknownNode) { - queue(new UnregisterComponentInstance(0, - TimeUnit.MILLISECONDS, result.roleInstance)); - - if (timelineServiceEnabled && result.roleInstance != null) { - serviceTimelinePublisher - .componentInstanceFinished(result.roleInstance); - } - } - } - - reviewRequestAndReleaseNodes("onContainersCompleted"); - } - - /** - * Signal that containers are being upgraded. Containers specified with - * --containers option and all containers of all roles specified with - * --components option are merged and upgraded. - * - * @param upgradeContainersRequest - * request containing upgrade details - */ - public synchronized void onUpgradeContainers( - ActionUpgradeContainers upgradeContainersRequest) throws IOException, - SliderException { - LOG_YARN.info("onUpgradeContainers({})", - upgradeContainersRequest.getMessage()); - Set containers = upgradeContainersRequest.getContainers() == null ? new HashSet() - : upgradeContainersRequest.getContainers(); - LOG_YARN.info(" Container list provided (total {}) : {}", - containers.size(), containers); - Set components = upgradeContainersRequest.getComponents() == null ? new HashSet() - : upgradeContainersRequest.getComponents(); - LOG_YARN.info(" Component list provided (total {}) : {}", - components.size(), components); - // If components are specified as well, then grab all the containers of - // each of the components (roles) - if (CollectionUtils.isNotEmpty(components)) { - Map liveContainers = appState.getLiveContainers(); - if (CollectionUtils.isNotEmpty(liveContainers.keySet())) { - Map> roleContainerMap = prepareRoleContainerMap(liveContainers); - for (String component : components) { - Set roleContainers = roleContainerMap.get(component); - if (roleContainers != null) { - containers.addAll(roleContainers); - } - } - } - } - LOG_YARN.info("Final list of containers to be upgraded (total {}) : {}", - containers.size(), containers); - } - - // create a reverse map of roles -> set of all live containers - private Map> prepareRoleContainerMap( - Map liveContainers) { - // liveContainers is ensured to be not empty - Map> roleContainerMap = new HashMap<>(); - for (Map.Entry liveContainer : liveContainers - .entrySet()) { - RoleInstance role = liveContainer.getValue(); - if (roleContainerMap.containsKey(role.role)) { - roleContainerMap.get(role.role).add(liveContainer.getKey().toString()); - } else { - Set containers = new HashSet(); - containers.add(liveContainer.getKey().toString()); - roleContainerMap.put(role.role, containers); - } - } - return roleContainerMap; - } - - /** - * Implementation of cluster flexing. - * It should be the only way that anything -even the AM itself on startup- - * asks for nodes. - * @throws SliderException slider problems, including invalid configs - * @throws IOException IO problems - */ - public void flexCluster(Messages.FlexComponentsRequestProto request) - throws IOException, SliderException { - if (request != null) { - appState.updateComponents(request); - } - // reset the scheduled windows...the values - // may have changed - appState.resetFailureCounts(); - - // ask for more containers if needed - reviewRequestAndReleaseNodes("flexCluster"); - } - - /** - * Schedule the failure window - * @throws BadConfigException if the window is out of range - */ - private void scheduleFailureWindowResets( - org.apache.slider.api.resource.Configuration conf) { - - ResetFailureWindow reset = new ResetFailureWindow(rmOperationHandler); - - long totalSeconds = SliderUtils.getTimeRange(conf, - ResourceKeys.CONTAINER_FAILURE_WINDOW, - ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS, - ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS, - ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, - 0); - if (totalSeconds > 0) { - log.info("Scheduling the failure window reset interval to every {}" - + " seconds", totalSeconds); - RenewingAction renew = - new RenewingAction<>(reset, totalSeconds, totalSeconds, - TimeUnit.SECONDS, 0); - actionQueues.renewing("failures", renew); - } else { - log.info("Failure window reset interval is not set"); - } - } - - /** - * Schedule the escalation action - * @throws BadConfigException - */ - private void scheduleEscalation( - org.apache.slider.api.resource.Configuration conf) { - EscalateOutstandingRequests escalate = new EscalateOutstandingRequests(); - long seconds = conf.getPropertyLong(InternalKeys.ESCALATION_CHECK_INTERVAL, - InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL); - RenewingAction renew = - new RenewingAction<>(escalate, seconds, seconds, TimeUnit.SECONDS, 0); - actionQueues.renewing("escalation", renew); - } - - /** - * Schedule monitor action - */ - private void scheduleMonitoring( - org.apache.slider.api.resource.Configuration conf) { - MonitorComponentInstances monitor = new MonitorComponentInstances(); - long seconds = conf.getPropertyLong(InternalKeys.MONITOR_INTERVAL, - InternalKeys.DEFAULT_MONITOR_INTERVAL); - RenewingAction renew = - new RenewingAction<>(monitor, seconds, seconds, TimeUnit.SECONDS, 0); - actionQueues.renewing("monitoring", renew); - } - - /** - * Look at where the current node state is and whether it should be changed. - * @param reason reason for operation - */ - private synchronized void reviewRequestAndReleaseNodes(String reason) { - log.info("reviewRequestAndReleaseNodes({})", reason); - queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS)); - } - - /** - * Handle the event requesting a review ... look at the queue and decide - * whether to act or not - * @param action action triggering the event. It may be put - * back into the queue - * @throws SliderInternalStateException - */ - public void handleReviewAndFlexApplicationSize(ReviewAndFlexApplicationSize action) - throws SliderInternalStateException { - - if ( actionQueues.hasQueuedActionWithAttribute( - AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) { - //TODO Loop all actions to check duplicate ?? - // this operation isn't needed at all -existing duplicate or shutdown due - return; - } - // if there is an action which changes cluster size, wait - if (actionQueues.hasQueuedActionWithAttribute( - AsyncAction.ATTR_CHANGES_APP_SIZE)) { - // place the action at the back of the queue - actionQueues.put(action); - } - - executeNodeReview(action.name); - } - - /** - * Look at where the current node state is -and whether it should be changed - */ - public synchronized void executeNodeReview(String reason) - throws SliderInternalStateException { - - log.info("in executeNodeReview({})", reason); - if (amCompletionFlag.get()) { - log.info("Ignoring node review operation: shutdown in progress"); - } - try { - List allOperations = appState.reviewRequestAndReleaseNodes(); - //now apply the operations - execute(allOperations); - } catch (TriggerClusterTeardownException e) { - //App state has decided that it is time to exit - log.error("Cluster teardown triggered {}", e, e); - queue(new ActionStopSlider(e)); - } - } - - /** - * Escalate operation as triggered by external timer. - *

- * Get the list of new operations off the AM, then executest them. - */ - public void escalateOutstandingRequests() { - List operations = appState.escalateOutstandingRequests(); - execute(operations); - } - - public void monitorComponentInstances() { - // TODO use health checks? - // TODO publish timeline events for monitoring changes? - if (appState.monitorComponentInstances()) { - // monitoring change - reviewRequestAndReleaseNodes("monitoring change"); - } - } - - - /** - * Shutdown operation: release all containers - */ - private void releaseAllContainers(Application application) { - // Add the sleep here (before releasing containers) so that applications get - // time to perform graceful shutdown - try { - long timeout = getContainerReleaseTimeout(application); - if (timeout > 0) { - Thread.sleep(timeout); - } - } catch (InterruptedException e) { - log.info("Sleep for container release interrupted"); - } finally { - List operations = appState.releaseAllContainers(); - // now apply the operations - execute(operations); - } - } - - private long getContainerReleaseTimeout(Application application) { - // Get container release timeout in millis or 0 if the property is not set. - long timeout = application.getConfiguration() - .getPropertyLong(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0); - - // convert to millis - long timeoutInMillis = timeout * 1000l; - log.info("Container release timeout in millis = {}", timeoutInMillis); - return timeoutInMillis; - } - - /** - * RM wants to shut down the AM - */ - @Override //AMRMClientAsync - public void onShutdownRequest() { - LOG_YARN.info("Shutdown Request received"); - signalAMComplete(new ActionStopSlider("stop", - EXIT_SUCCESS, - FinalApplicationStatus.SUCCEEDED, - "Shutdown requested from RM")); - } - - /** - * Monitored nodes have been changed - * @param updatedNodes list of updated nodes - */ - @Override //AMRMClientAsync - public void onNodesUpdated(List updatedNodes) { - LOG_YARN.info("onNodesUpdated({})", updatedNodes.size()); - log.info("Updated nodes {}", updatedNodes); - // Check if any nodes are lost or revived and update state accordingly - - AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes); - if (!outcome.operations.isEmpty()) { - execute(outcome.operations); - } - // trigger a review if the cluster changed - if (outcome.clusterChanged) { - reviewRequestAndReleaseNodes("nodes updated"); - } - } - - /** - * heartbeat operation; return the ratio of requested - * to actual - * @return progress - */ - @Override //AMRMClientAsync - public float getProgress() { - return appState.getApplicationProgressPercentage(); - } - - @Override //AMRMClientAsync - public void onError(Throwable e) { - if (e instanceof InvalidResourceRequestException) { - // stop the cluster - LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e); - signalAMComplete(new ActionStopSlider("stop", EXIT_EXCEPTION_THROWN, - FinalApplicationStatus.FAILED, - SliderUtils.extractFirstLine(e.getLocalizedMessage()))); - } else if (e instanceof InvalidApplicationMasterRequestException) { - // halt the AM - LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e); - queue(new ActionHalt(EXIT_EXCEPTION_THROWN, - SliderUtils.extractFirstLine(e.getLocalizedMessage()))); - } else { - // ignore and log - LOG_YARN.info("Ignoring AMRMClientAsync.onError() received {}", e); - } - } - -/* =================================================================== */ -/* RMOperationHandlerActions */ -/* =================================================================== */ - - - @Override - public void execute(List operations) { - rmOperationHandler.execute(operations); - } - - @Override - public void releaseAssignedContainer(ContainerId containerId) { - rmOperationHandler.releaseAssignedContainer(containerId); - } - - @Override - public void addContainerRequest(AMRMClient.ContainerRequest req) { - rmOperationHandler.addContainerRequest(req); - } - - @Override - public int cancelContainerRequests(Priority priority1, - Priority priority2, - int count) { - return rmOperationHandler.cancelContainerRequests(priority1, priority2, count); - } - - @Override - public void cancelSingleRequest(AMRMClient.ContainerRequest request) { - rmOperationHandler.cancelSingleRequest(request); - } - - @Override - public void updateBlacklist(List blacklistAdditions, - List blacklistRemovals) { - rmOperationHandler.updateBlacklist(blacklistAdditions, blacklistRemovals); - } - -/* =================================================================== */ -/* END */ -/* =================================================================== */ - - /** - * Launch the provider service - * @throws IOException - * @throws SliderException - */ - protected synchronized void launchProviderService() - throws IOException, SliderException { - // didn't start, so don't register - for (ProviderService providerService : providers) { -// providerService.start(); - } - // and send the started event ourselves - eventCallbackEvent(null); - } - - /* =================================================================== */ - /* EventCallback from the child or ourselves directly */ - /* =================================================================== */ - - @Override // ProviderCompleted - public void eventCallbackEvent(Object parameter) { - // now ask for the cluster nodes - try { - flexCluster(null); - } catch (Exception e) { - // cluster flex failure: log - log.error("Failed to flex cluster nodes: {}", e, e); - // then what? exit - queue(new ActionStopSlider(e)); - } - } - - - /** - * Async start container request - * @param container container - * @param ctx context - * @param instance node details - */ - public void startContainer(Container container, - ContainerLaunchContext ctx, - RoleInstance instance) throws IOException { - appState.containerStartSubmitted(container, instance); - - nmClientAsync.startContainerAsync(container, ctx); - } - - @Override // NMClientAsync.CallbackHandler - public void onContainerStopped(ContainerId containerId) { - // do nothing but log: container events from the AM - // are the source of container halt details to react to - log.info("onContainerStopped {} ", containerId); - } - - @Override // NMClientAsync.CallbackHandler - public void onContainerStarted(ContainerId containerId, - Map allServiceResponse) { - LOG_YARN.info("Started Container {} ", containerId); - RoleInstance cinfo = appState.onNodeManagerContainerStarted(containerId); - if (cinfo != null) { - LOG_YARN.info("Deployed instance of role {} onto {}", - cinfo.role, containerId); - //trigger an async container status - nmClientAsync.getContainerStatusAsync(containerId, - cinfo.container.getNodeId()); - // push out a registration - queue(new RegisterComponentInstance(containerId, cinfo, - 0, TimeUnit.MILLISECONDS)); - - } else { - //this is a hypothetical path not seen. We react by warning - log.error("Notified of started container that isn't pending {} - releasing", - containerId); - //then release it - asyncRMClient.releaseAssignedContainer(containerId); - } - } - - @Override // NMClientAsync.CallbackHandler - public void onStartContainerError(ContainerId containerId, Throwable t) { - LOG_YARN.error("Failed to start Container {}", containerId, t); - appState.onNodeManagerContainerStartFailed(containerId, t); - } - - @Override // NMClientAsync.CallbackHandler - public void onContainerStatusReceived(ContainerId containerId, - ContainerStatus containerStatus) { - LOG_YARN.debug("Container Status: id={}, status={}", containerId, - containerStatus); - RoleInstance cinfo = appState.getOwnedContainer(containerId); - if (cinfo == null) { - LOG_YARN.error("Owned container not found for {}", containerId); - return; - } - ProviderService providerService = ProviderFactory - .getProviderService(cinfo.providerRole.component.getArtifact()); -// if (providerService.processContainerStatus(containerId, containerStatus)) { -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// } -// LOG_YARN.info("Re-requesting status for role {}, {}", -// cinfo.role, containerId); -// //trigger another async container status -// nmClientAsync.getContainerStatusAsync(containerId, -// cinfo.container.getNodeId()); -// } else if (timelineServiceEnabled) { -// RoleInstance instance = appState.getOwnedContainer(containerId); -// if (instance != null) { -// org.apache.slider.api.resource.Container container = -// instance.providerRole.component -// .getContainer(containerId.toString()); -// if (container != null) { -// serviceTimelinePublisher.componentInstanceUpdated(container); -// } -// } -// } - } - - @Override // NMClientAsync.CallbackHandler - public void onGetContainerStatusError( - ContainerId containerId, Throwable t) { - LOG_YARN.error("Failed to query the status of Container {}", containerId); - } - - @Override // NMClientAsync.CallbackHandler - public void onStopContainerError(ContainerId containerId, Throwable t) { - LOG_YARN.warn("Failed to stop Container {}", containerId); - } - - /** - * Queue an action for immediate execution in the executor thread - * @param action action to execute - */ - public void queue(AsyncAction action) { - actionQueues.put(action); - } - - /** - * Schedule an action - * @param action for delayed execution - */ - public void schedule(AsyncAction action) { - actionQueues.schedule(action); - } - - - /** - * Handle any exception in a thread. If the exception provides an exit - * code, that is the one that will be used - * @param thread thread throwing the exception - * @param exception exception - */ - public void onExceptionInThread(Thread thread, Throwable exception) { - log.error("Exception in {}: {}", thread.getName(), exception, exception); - - // if there is a teardown in progress, ignore it - if (amCompletionFlag.get()) { - log.info("Ignoring exception: shutdown in progress"); - } else { - int exitCode = EXIT_EXCEPTION_THROWN; - if (exception instanceof ExitCodeProvider) { - exitCode = ((ExitCodeProvider) exception).getExitCode(); - } - signalAMComplete(new ActionStopSlider("stop", - exitCode, - FinalApplicationStatus.FAILED, - SliderUtils.extractFirstLine(exception.getLocalizedMessage()))); - } - } - - /** - * TODO Read chaos monkey params from AM configuration rather than app - * configuration - * @return true if it started - */ - private boolean maybeStartMonkey() { - org.apache.slider.api.resource.Configuration configuration = - application.getConfiguration(); - boolean enabled = configuration.getPropertyBool( - InternalKeys.CHAOS_MONKEY_ENABLED, - InternalKeys.DEFAULT_CHAOS_MONKEY_ENABLED); - if (!enabled) { - log.debug("Chaos monkey disabled"); - return false; - } - - long monkeyInterval = SliderUtils.getTimeRange(configuration, - InternalKeys.CHAOS_MONKEY_INTERVAL, - InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_DAYS, - InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_HOURS, - InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_MINUTES, - 0); - if (monkeyInterval == 0) { - log.debug( - "Chaos monkey not configured with a time interval...not enabling"); - return false; - } - - long monkeyDelay = SliderUtils.getTimeRange(configuration, - InternalKeys.CHAOS_MONKEY_DELAY, - 0, - 0, - 0, - (int)monkeyInterval); - - log.info("Adding Chaos Monkey scheduled every {} seconds ({} hours -delay {}", - monkeyInterval, monkeyInterval/(60*60), monkeyDelay); - monkey = new ChaosMonkeyService(metrics, actionQueues); - initAndAddService(monkey); - - // configure the targets - - // launch failure: special case with explicit failure triggered now - int amLaunchFailProbability = configuration.getPropertyInt( - InternalKeys.CHAOS_MONKEY_PROBABILITY_AM_LAUNCH_FAILURE, - 0); - if (amLaunchFailProbability > 0 && monkey.chaosCheck( - amLaunchFailProbability)) { - log.info("Chaos Monkey has triggered AM Launch failure"); - // trigger a failure - ActionStopSlider stop = new ActionStopSlider("stop", - 0, TimeUnit.SECONDS, - LauncherExitCodes.EXIT_FALSE, - FinalApplicationStatus.FAILED, - E_TRIGGERED_LAUNCH_FAILURE); - queue(stop); - } - - int amKillProbability = configuration.getPropertyInt( - InternalKeys.CHAOS_MONKEY_PROBABILITY_AM_FAILURE, - InternalKeys.DEFAULT_CHAOS_MONKEY_PROBABILITY_AM_FAILURE); - monkey.addTarget("AM killer", - new ChaosKillAM(actionQueues, -1), amKillProbability); - int containerKillProbability = configuration.getPropertyInt( - InternalKeys.CHAOS_MONKEY_PROBABILITY_CONTAINER_FAILURE, - InternalKeys.DEFAULT_CHAOS_MONKEY_PROBABILITY_CONTAINER_FAILURE); - monkey.addTarget("Container killer", - new ChaosKillContainer(appState, actionQueues, rmOperationHandler), - containerKillProbability); - - // and schedule it - if (monkey.schedule(monkeyDelay, monkeyInterval, TimeUnit.SECONDS)) { - log.info("Chaos Monkey is running"); - return true; - } else { - log.info("Chaos monkey not started"); - return false; - } - } - - /** - * This is the main entry point for the service launcher. - * @param args command line arguments. - */ - public static void main(String[] args) { - - //turn the args to a list - List argsList = Arrays.asList(args); - //create a new list, as the ArrayList type doesn't push() on an insert - List extendedArgs = new ArrayList(argsList); - //insert the service name - extendedArgs.add(0, SERVICE_CLASSNAME); - //now have the service launcher do its work - ServiceLauncher.serviceMain(extendedArgs); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00bb2ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java deleted file mode 100644 index a7b94ed..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.appmaster.actions; - -import org.apache.slider.api.proto.Messages; -import org.apache.slider.server.appmaster.SliderAppMaster; -import org.apache.slider.server.appmaster.state.AppState; - -import java.util.concurrent.TimeUnit; - -public class ActionFlexCluster extends AsyncAction { - - final Messages.FlexComponentsRequestProto requestProto; - public ActionFlexCluster(String name, long delay, TimeUnit timeUnit, - Messages.FlexComponentsRequestProto requestProto) { - super(name, delay, timeUnit, ATTR_CHANGES_APP_SIZE); - this.requestProto = requestProto; - } - @Override - public void execute(SliderAppMaster appMaster, - QueueAccess queueService, - AppState appState) throws Exception { - appMaster.flexCluster(requestProto); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00bb2ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java deleted file mode 100644 index ee1bb72..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.appmaster.actions; - -import org.apache.hadoop.util.ExitUtil; -import org.apache.slider.server.appmaster.SliderAppMaster; -import org.apache.slider.server.appmaster.state.AppState; - -import java.util.concurrent.TimeUnit; - -/** - * Exit an emergency JVM halt. - * @see ExitUtil#halt(int, String) - */ -public class ActionHalt extends AsyncAction { - - private final int status; - private final String text; - - public ActionHalt( - int status, - String text) { - this(status, text, 0, TimeUnit.MILLISECONDS); - } - - public ActionHalt( - int status, - String text, - long delay, TimeUnit timeUnit) { - - // do not declare that this action halts the cluster ... keep it a surprise - super("Halt", delay, timeUnit); - this.status = status; - this.text = text; - } - - @Override - public void execute(SliderAppMaster appMaster, - QueueAccess queueService, - AppState appState) throws Exception { - ExitUtil.halt(status, text); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org