Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91F0C200BD1 for ; Mon, 28 Nov 2016 22:16:02 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 90B5F160B22; Mon, 28 Nov 2016 21:16:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 24DD2160B27 for ; Mon, 28 Nov 2016 22:15:59 +0100 (CET) Received: (qmail 47593 invoked by uid 500); 28 Nov 2016 21:15:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 46185 invoked by uid 99); 28 Nov 2016 21:15:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Nov 2016 21:15:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 61D56F0DBF; Mon, 28 Nov 2016 21:15:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Mon, 28 Nov 2016 21:16:07 -0000 Message-Id: <6c27ad1043a94af7b33432702de257ea@git.apache.org> In-Reply-To: <3022f5161bb4468c92a2825216e9111f@git.apache.org> References: <3022f5161bb4468c92a2825216e9111f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/63] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe) archived-at: Mon, 28 Nov 2016 21:16:02 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/a379904d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java new file mode 100644 index 0000000..4ffae7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -0,0 +1,3212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.agent; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.InternalKeys; +import org.apache.slider.api.OptionKeys; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.api.StatusKeys; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.SliderXmlConfKeys; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.NoSuchNodeException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.launch.CommandLineBuilder; +import org.apache.slider.core.launch.ContainerLauncher; +import org.apache.slider.core.registry.docstore.ConfigFormat; +import org.apache.slider.core.registry.docstore.ConfigUtils; +import org.apache.slider.core.registry.docstore.ExportEntry; +import org.apache.slider.core.registry.docstore.PublishedConfiguration; +import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; +import org.apache.slider.core.registry.docstore.PublishedExports; +import org.apache.slider.core.registry.info.CustomRegistryConstants; +import org.apache.slider.providers.AbstractProviderService; +import org.apache.slider.providers.MonitorDetail; +import org.apache.slider.providers.ProviderCore; +import org.apache.slider.providers.ProviderRole; +import org.apache.slider.providers.ProviderUtils; +import org.apache.slider.providers.agent.application.metadata.AbstractComponent; +import org.apache.slider.providers.agent.application.metadata.Application; +import org.apache.slider.providers.agent.application.metadata.CommandScript; +import org.apache.slider.providers.agent.application.metadata.Component; +import org.apache.slider.providers.agent.application.metadata.ComponentCommand; +import org.apache.slider.providers.agent.application.metadata.ComponentExport; +import org.apache.slider.providers.agent.application.metadata.ComponentsInAddonPackage; +import org.apache.slider.providers.agent.application.metadata.ConfigFile; +import org.apache.slider.providers.agent.application.metadata.DefaultConfig; +import org.apache.slider.providers.agent.application.metadata.DockerContainer; +import org.apache.slider.providers.agent.application.metadata.Export; +import org.apache.slider.providers.agent.application.metadata.ExportGroup; +import org.apache.slider.providers.agent.application.metadata.Metainfo; +import org.apache.slider.providers.agent.application.metadata.OSPackage; +import org.apache.slider.providers.agent.application.metadata.OSSpecific; +import org.apache.slider.providers.agent.application.metadata.Package; +import org.apache.slider.providers.agent.application.metadata.PropertyInfo; +import org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss; +import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; +import org.apache.slider.server.appmaster.state.ContainerPriority; +import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.slider.server.appmaster.state.StateAccessForProviders; +import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType; +import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations; +import org.apache.slider.server.appmaster.web.rest.agent.CommandReport; +import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus; +import org.apache.slider.server.appmaster.web.rest.agent.ExecutionCommand; +import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat; +import org.apache.slider.server.appmaster.web.rest.agent.HeartBeatResponse; +import org.apache.slider.server.appmaster.web.rest.agent.Register; +import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse; +import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus; +import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand; +import org.apache.slider.server.services.security.CertificateManager; +import org.apache.slider.server.services.security.SecurityStore; +import org.apache.slider.server.services.security.StoresGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS; + +/** + * This class implements the server-side logic for application deployment through Slider application package + */ +public class AgentProviderService extends AbstractProviderService implements + ProviderCore, + AgentKeys, + SliderKeys, AgentRestOperations { + + + protected static final Logger log = + LoggerFactory.getLogger(AgentProviderService.class); + private static final ProviderUtils providerUtils = new ProviderUtils(log); + private static final String LABEL_MAKER = "___"; + private static final String CONTAINER_ID = "container_id"; + private static final String GLOBAL_CONFIG_TAG = "global"; + private static final String LOG_FOLDERS_TAG = "LogFolders"; + private static final String HOST_FOLDER_FORMAT = "%s:%s"; + private static final String CONTAINER_LOGS_TAG = "container_log_dirs"; + private static final String CONTAINER_PWDS_TAG = "container_work_dirs"; + private static final String COMPONENT_TAG = "component"; + private static final String APPLICATION_TAG = "application"; + private static final String COMPONENT_DATA_TAG = "ComponentInstanceData"; + private static final String SHARED_PORT_TAG = "SHARED"; + private static final String PER_CONTAINER_TAG = "{PER_CONTAINER}"; + private static final int MAX_LOG_ENTRIES = 40; + private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000; + + private final Object syncLock = new Object(); + private final ComponentTagProvider tags = new ComponentTagProvider(); + private int heartbeatMonitorInterval = 0; + private AgentClientProvider clientProvider; + private AtomicInteger taskId = new AtomicInteger(0); + private volatile Metainfo metaInfo = null; + private SliderFileSystem fileSystem = null; + private Map defaultConfigs = null; + private ComponentCommandOrder commandOrder = null; + private HeartbeatMonitor monitor; + private Boolean canAnyMasterPublish = null; + private AgentLaunchParameter agentLaunchParameter = null; + private String clusterName = null; + private boolean isInUpgradeMode; + private Set upgradeContainers = new HashSet(); + private boolean appStopInitiated; + + private final Map componentStatuses = + new ConcurrentHashMap(); + private final Map> componentInstanceData = + new ConcurrentHashMap>(); + private final Map>> exportGroups = + new ConcurrentHashMap>>(); + private final Map> allocatedPorts = + new ConcurrentHashMap>(); + private final Map packageMetainfo = + new ConcurrentHashMap(); + + private final Map logFolderExports = + Collections.synchronizedMap(new LinkedHashMap(MAX_LOG_ENTRIES, 0.75f, false) { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_LOG_ENTRIES; + } + }); + private final Map workFolderExports = + Collections.synchronizedMap(new LinkedHashMap(MAX_LOG_ENTRIES, 0.75f, false) { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_LOG_ENTRIES; + } + }); + private final Map> containerExportsMap = + new HashMap>(); + + /** + * Create an instance of AgentProviderService + */ + public AgentProviderService() { + super("AgentProviderService"); + setAgentRestOperations(this); + setHeartbeatMonitorInterval(DEFAULT_HEARTBEAT_MONITOR_INTERVAL); + } + + @Override + public String getHumanName() { + return "Slider Agent"; + } + + @Override + public List getRoles() { + return AgentRoles.getRoles(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + clientProvider = new AgentClientProvider(conf); + } + + @Override + public Configuration loadProviderConfigurationInformation(File confDir) throws + BadCommandArgumentsException, + IOException { + return new Configuration(false); + } + + @Override + public void validateInstanceDefinition(AggregateConf instanceDefinition) + throws + SliderException { + clientProvider.validateInstanceDefinition(instanceDefinition, null); + + ConfTreeOperations resources = + instanceDefinition.getResourceOperations(); + + Set names = resources.getComponentNames(); + names.remove(SliderKeys.COMPONENT_AM); + for (String name : names) { + Component componentDef = getMetaInfo().getApplicationComponent(name); + if (componentDef == null) { + throw new BadConfigException( + "Component %s is not a member of application.", name); + } + + MapOperations componentConfig = resources.getMandatoryComponent(name); + int count = + componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES); + int definedMinCount = componentDef.getMinInstanceCountInt(); + int definedMaxCount = componentDef.getMaxInstanceCountInt(); + if (count < definedMinCount || count > definedMaxCount) { + throw new BadConfigException("Component %s, %s value %d out of range. " + + "Expected minimum is %d and maximum is %d", + name, + ResourceKeys.COMPONENT_INSTANCES, + count, + definedMinCount, + definedMaxCount); + } + } + } + + // Reads the metainfo.xml in the application package and loads it + private void buildMetainfo(AggregateConf instanceDefinition, + SliderFileSystem fileSystem) throws IOException, SliderException { + String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition + .getAppConfOperations()); + + if (metaInfo == null) { + synchronized (syncLock) { + if (metaInfo == null) { + this.fileSystem = fileSystem; + readAndSetHeartbeatMonitoringInterval(instanceDefinition); + initializeAgentDebugCommands(instanceDefinition); + + metaInfo = getApplicationMetainfo(fileSystem, appDef, false); + log.info("Master package metainfo: {}", metaInfo.toString()); + if (metaInfo == null || metaInfo.getApplication() == null) { + log.error("metainfo.xml is unavailable or malformed at {}.", appDef); + throw new SliderException( + "metainfo.xml is required in app package."); + } + commandOrder = new ComponentCommandOrder(metaInfo.getApplication().getCommandOrders()); + defaultConfigs = initializeDefaultConfigs(fileSystem, appDef, metaInfo); + monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval()); + monitor.start(); + + // build a map from component to metainfo + String addonAppDefString = instanceDefinition.getAppConfOperations() + .getGlobalOptions().getOption(AgentKeys.ADDONS, null); + log.debug("All addon appdefs: {}", addonAppDefString); + if (addonAppDefString != null) { + Scanner scanner = new Scanner(addonAppDefString).useDelimiter(","); + while (scanner.hasNext()) { + String addonAppDef = scanner.next(); + String addonAppDefPath = instanceDefinition + .getAppConfOperations().getGlobalOptions().get(addonAppDef); + log.debug("Addon package {} is stored at: {}", addonAppDef + + addonAppDefPath); + Metainfo addonMetaInfo = getApplicationMetainfo(fileSystem, + addonAppDefPath, true); + addonMetaInfo.validate(); + packageMetainfo.put(addonMetaInfo.getApplicationPackage() + .getName(), addonMetaInfo); + } + log.info("Metainfo map for master and addon: {}", + packageMetainfo.toString()); + } + } + } + } + } + + @Override + public void initializeApplicationConfiguration( + AggregateConf instanceDefinition, SliderFileSystem fileSystem) + throws IOException, SliderException { + buildMetainfo(instanceDefinition, fileSystem); + } + + @Override + public void buildContainerLaunchContext(ContainerLauncher launcher, + AggregateConf instanceDefinition, + Container container, + ProviderRole providerRole, + SliderFileSystem fileSystem, + Path generatedConfPath, + MapOperations resourceComponent, + MapOperations appComponent, + Path containerTmpDirPath) throws + IOException, + SliderException { + + String roleName = providerRole.name; + String roleGroup = providerRole.group; + String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition + .getAppConfOperations()); + + initializeApplicationConfiguration(instanceDefinition, fileSystem); + + log.info("Build launch context for Agent"); + log.debug(instanceDefinition.toString()); + + //if we are launching docker based app on yarn, then we need to pass docker image + if (isYarnDockerContainer(roleGroup)) { + launcher.setYarnDockerMode(true); + launcher.setDockerImage(getConfigFromMetaInfo(roleGroup, "image")); + launcher.setRunPrivilegedContainer(getConfigFromMetaInfo(roleGroup, "runPriviledgedContainer")); + launcher + .setYarnContainerMountPoints(getConfigFromMetaInfoWithAppConfigOverriding( + roleGroup, "yarn.container.mount.points")); + } + + // Set the environment + launcher.putEnv(SliderUtils.buildEnvMap(appComponent, + getStandardTokenMap(getAmState().getAppConfSnapshot(), roleName, roleGroup))); + + String workDir = ApplicationConstants.Environment.PWD.$(); + launcher.setEnv("AGENT_WORK_ROOT", workDir); + log.info("AGENT_WORK_ROOT set to {}", workDir); + String logDir = ApplicationConstants.LOG_DIR_EXPANSION_VAR; + launcher.setEnv("AGENT_LOG_ROOT", logDir); + log.info("AGENT_LOG_ROOT set to {}", logDir); + if (System.getenv(HADOOP_USER_NAME) != null) { + launcher.setEnv(HADOOP_USER_NAME, System.getenv(HADOOP_USER_NAME)); + } + // for 2-Way SSL + launcher.setEnv(SLIDER_PASSPHRASE, instanceDefinition.getPassphrase()); + //add english env + launcher.setEnv("LANG", "en_US.UTF-8"); + launcher.setEnv("LC_ALL", "en_US.UTF-8"); + launcher.setEnv("LANGUAGE", "en_US.UTF-8"); + + //local resources + + // TODO: Should agent need to support App Home + String scriptPath = new File(AgentKeys.AGENT_MAIN_SCRIPT_ROOT, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); + String appHome = instanceDefinition.getAppConfOperations(). + getGlobalOptions().get(AgentKeys.PACKAGE_PATH); + if (SliderUtils.isSet(appHome)) { + scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); + } + + // set PYTHONPATH + List pythonPaths = new ArrayList(); + pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT); + pythonPaths.add(AgentKeys.AGENT_JINJA2_ROOT); + String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths); + launcher.setEnv(PYTHONPATH, pythonPath); + log.info("PYTHONPATH set to {}", pythonPath); + + Path agentImagePath = null; + String agentImage = instanceDefinition.getInternalOperations(). + get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); + if (SliderUtils.isUnset(agentImage)) { + agentImagePath = + new Path(new Path(new Path(instanceDefinition.getInternalOperations().get(InternalKeys.INTERNAL_TMP_DIR), + container.getId().getApplicationAttemptId().getApplicationId().toString()), + AgentKeys.PROVIDER_AGENT), + SliderKeys.AGENT_TAR); + } else { + agentImagePath = new Path(agentImage); + } + + if (fileSystem.getFileSystem().exists(agentImagePath)) { + LocalResource agentImageRes = fileSystem.createAmResource(agentImagePath, LocalResourceType.ARCHIVE); + launcher.addLocalResource(AgentKeys.AGENT_INSTALL_DIR, agentImageRes); + } else { + String msg = + String.format("Required agent image slider-agent.tar.gz is unavailable at %s", agentImagePath.toString()); + MapOperations compOps = appComponent; + boolean relaxVerificationForTest = compOps != null ? Boolean.valueOf(compOps. + getOptionBool(AgentKeys.TEST_RELAX_VERIFICATION, false)) : false; + log.error(msg); + + if (!relaxVerificationForTest) { + throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, msg); + } + } + + log.info("Using {} for agent.", scriptPath); + LocalResource appDefRes = fileSystem.createAmResource( + fileSystem.getFileSystem().resolvePath(new Path(appDef)), + LocalResourceType.ARCHIVE); + launcher.addLocalResource(AgentKeys.APP_DEFINITION_DIR, appDefRes); + + for (Package pkg : getMetaInfo().getApplication().getPackages()) { + Path pkgPath = fileSystem.buildResourcePath(pkg.getName()); + if (!fileSystem.isFile(pkgPath)) { + pkgPath = fileSystem.buildResourcePath(getClusterName(), + pkg.getName()); + } + if (!fileSystem.isFile(pkgPath)) { + throw new IOException("Package doesn't exist as a resource: " + + pkg.getName()); + } + log.info("Adding resource {}", pkg.getName()); + LocalResourceType type = LocalResourceType.FILE; + if ("archive".equals(pkg.getType())) { + type = LocalResourceType.ARCHIVE; + } + LocalResource packageResource = fileSystem.createAmResource( + pkgPath, type); + launcher.addLocalResource(AgentKeys.APP_PACKAGES_DIR, packageResource); + } + + String agentConf = instanceDefinition.getAppConfOperations(). + getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); + if (SliderUtils.isSet(agentConf)) { + LocalResource agentConfRes = fileSystem.createAmResource(fileSystem + .getFileSystem().resolvePath(new Path(agentConf)), + LocalResourceType.FILE); + launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes); + } + + String agentVer = instanceDefinition.getAppConfOperations(). + getGlobalOptions().getOption(AgentKeys.AGENT_VERSION, null); + if (agentVer != null) { + LocalResource agentVerRes = fileSystem.createAmResource( + fileSystem.getFileSystem().resolvePath(new Path(agentVer)), + LocalResourceType.FILE); + launcher.addLocalResource(AgentKeys.AGENT_VERSION_FILE, agentVerRes); + } + + if (SliderUtils.isHadoopClusterSecure(getConfig())) { + localizeServiceKeytabs(launcher, instanceDefinition, fileSystem); + } + + MapOperations amComponent = instanceDefinition. + getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM); + boolean twoWayEnabled = amComponent != null ? Boolean.valueOf(amComponent. + getOptionBool(AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) : false; + if (twoWayEnabled) { + localizeContainerSSLResources(launcher, container, fileSystem); + } + + MapOperations compOps = appComponent; + if (areStoresRequested(compOps)) { + localizeContainerSecurityStores(launcher, container, roleName, fileSystem, + instanceDefinition, compOps); + } + + //add the configuration resources + launcher.addLocalResources(fileSystem.submitDirectory( + generatedConfPath, + SliderKeys.PROPAGATED_CONF_DIR_NAME)); + + if (appComponent.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, false)) { + // build and localize configuration files + Map> configurations = + buildCommandConfigurations(instanceDefinition.getAppConfOperations(), + container.getId().toString(), roleName, roleGroup); + localizeConfigFiles(launcher, roleName, roleGroup, getMetaInfo(), + configurations, launcher.getEnv(), fileSystem); + } + + String label = getContainerLabel(container, roleName, roleGroup); + CommandLineBuilder operation = new CommandLineBuilder(); + + String pythonExec = instanceDefinition.getAppConfOperations() + .getGlobalOptions().getOption(SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH, + AgentKeys.PYTHON_EXE); + + operation.add(pythonExec); + + operation.add(scriptPath); + operation.add(ARG_LABEL, label); + operation.add(ARG_ZOOKEEPER_QUORUM); + operation.add(getClusterOptionPropertyValue(OptionKeys.ZOOKEEPER_QUORUM)); + operation.add(ARG_ZOOKEEPER_REGISTRY_PATH); + operation.add(getZkRegistryPath()); + + String debugCmd = agentLaunchParameter.getNextLaunchParameter(roleGroup); + if (SliderUtils.isSet(debugCmd)) { + operation.add(ARG_DEBUG); + operation.add(debugCmd); + } + + operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + + AgentKeys.AGENT_OUT_FILE + " 2>&1"); + + launcher.addCommand(operation.build()); + + // localize addon package + String addonAppDefString = instanceDefinition.getAppConfOperations() + .getGlobalOptions().getOption(AgentKeys.ADDONS, null); + log.debug("All addon appdefs: {}", addonAppDefString); + if (addonAppDefString != null) { + Scanner scanner = new Scanner(addonAppDefString).useDelimiter(","); + while (scanner.hasNext()) { + String addonAppDef = scanner.next(); + String addonAppDefPath = instanceDefinition + .getAppConfOperations().getGlobalOptions().get(addonAppDef); + log.debug("Addon package {} is stored at: {}", addonAppDef, addonAppDefPath); + LocalResource addonPkgRes = fileSystem.createAmResource( + fileSystem.getFileSystem().resolvePath(new Path(addonAppDefPath)), + LocalResourceType.ARCHIVE); + launcher.addLocalResource(AgentKeys.ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes); + } + log.debug("Metainfo map for master and addon: {}", + packageMetainfo.toString()); + } + + // Additional files to localize in addition to the application def + String appResourcesString = instanceDefinition.getAppConfOperations() + .getGlobalOptions().getOption(AgentKeys.APP_RESOURCES, null); + log.info("Configuration value for extra resources to localize: {}", appResourcesString); + if (null != appResourcesString) { + try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) { + while (scanner.hasNext()) { + String resource = scanner.next(); + Path resourcePath = new Path(resource); + LocalResource extraResource = fileSystem.createAmResource( + fileSystem.getFileSystem().resolvePath(resourcePath), + LocalResourceType.FILE); + String destination = AgentKeys.APP_RESOURCES_DIR + "/" + resourcePath.getName(); + log.info("Localizing {} to {}", resourcePath, destination); + // TODO Can we try harder to avoid collisions? + launcher.addLocalResource(destination, extraResource); + } + } + } + + // initialize addon pkg states for all componentInstanceStatus + Map pkgStatuses = new TreeMap<>(); + for (Metainfo appPkg : packageMetainfo.values()) { + // check each component of that addon to see if they apply to this + // component 'role' + for (ComponentsInAddonPackage comp : appPkg.getApplicationPackage() + .getComponents()) { + log.debug("Current component: {} component in metainfo: {}", roleName, + comp.getName()); + if (comp.getName().equals(roleGroup) + || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) { + pkgStatuses.put(appPkg.getApplicationPackage().getName(), State.INIT); + } + } + } + log.debug("For component: {} pkg status map: {}", roleName, + pkgStatuses.toString()); + + // initialize the component instance state + getComponentStatuses().put(label, + new ComponentInstanceState( + roleName, + container.getId(), + getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME), + pkgStatuses)); + } + + private void localizeContainerSecurityStores(ContainerLauncher launcher, + Container container, + String role, + SliderFileSystem fileSystem, + AggregateConf instanceDefinition, + MapOperations compOps) + throws SliderException, IOException { + // generate and localize security stores + SecurityStore[] stores = generateSecurityStores(container, role, + instanceDefinition, compOps); + for (SecurityStore store : stores) { + LocalResource keystoreResource = fileSystem.createAmResource( + uploadSecurityResource(store.getFile(), fileSystem), LocalResourceType.FILE); + launcher.addLocalResource(String.format("secstores/%s-%s.p12", + store.getType(), role), + keystoreResource); + } + } + + private SecurityStore[] generateSecurityStores(Container container, + String role, + AggregateConf instanceDefinition, + MapOperations compOps) + throws SliderException, IOException { + return StoresGenerator.generateSecurityStores(container.getNodeId().getHost(), + container.getId().toString(), role, + instanceDefinition, compOps); + } + + private boolean areStoresRequested(MapOperations compOps) { + return compOps != null ? compOps. + getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false) : false; + } + + private void localizeContainerSSLResources(ContainerLauncher launcher, + Container container, + SliderFileSystem fileSystem) + throws SliderException { + try { + // localize server cert + Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName()); + LocalResource certResource = fileSystem.createAmResource( + new Path(certsDir, SliderKeys.CRT_FILE_NAME), + LocalResourceType.FILE); + launcher.addLocalResource(AgentKeys.CERT_FILE_LOCALIZATION_PATH, + certResource); + + // generate and localize agent cert + CertificateManager certMgr = new CertificateManager(); + String hostname = container.getNodeId().getHost(); + String containerId = container.getId().toString(); + certMgr.generateContainerCertificate(hostname, containerId); + LocalResource agentCertResource = fileSystem.createAmResource( + uploadSecurityResource( + CertificateManager.getAgentCertficateFilePath(containerId), + fileSystem), LocalResourceType.FILE); + // still using hostname as file name on the agent side, but the files + // do end up under the specific container's file space + launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname + + ".crt", agentCertResource); + LocalResource agentKeyResource = fileSystem.createAmResource( + uploadSecurityResource( + CertificateManager.getAgentKeyFilePath(containerId), fileSystem), + LocalResourceType.FILE); + launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname + + ".key", agentKeyResource); + + } catch (Exception e) { + throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e, + "Unable to localize certificates. Two-way SSL cannot be enabled"); + } + } + + private Path uploadSecurityResource(File resource, SliderFileSystem fileSystem) + throws IOException { + Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName()); + return uploadResource(resource, fileSystem, certsDir); + } + + private Path uploadResource(File resource, SliderFileSystem fileSystem, + String roleName) throws IOException { + Path dir; + if (roleName == null) { + dir = fileSystem.buildClusterResourcePath(getClusterName()); + } else { + dir = fileSystem.buildClusterResourcePath(getClusterName(), roleName); + } + return uploadResource(resource, fileSystem, dir); + } + + private static synchronized Path uploadResource(File resource, + SliderFileSystem fileSystem, Path parentDir) throws IOException { + if (!fileSystem.getFileSystem().exists(parentDir)) { + fileSystem.getFileSystem().mkdirs(parentDir, + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); + } + Path destPath = new Path(parentDir, resource.getName()); + if (!fileSystem.getFileSystem().exists(destPath)) { + FSDataOutputStream os = null; + try { + os = fileSystem.getFileSystem().create(destPath); + byte[] contents = FileUtils.readFileToByteArray(resource); + os.write(contents, 0, contents.length); + os.flush(); + } finally { + IOUtils.closeStream(os); + } + log.info("Uploaded {} to localization path {}", resource, destPath); + } else { + log.info("Resource {} already existed at localization path {}", resource, + destPath); + } + + while (!fileSystem.getFileSystem().exists(destPath)) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + } + + fileSystem.getFileSystem().setPermission(destPath, + new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE)); + + return destPath; + } + + private void localizeServiceKeytabs(ContainerLauncher launcher, + AggregateConf instanceDefinition, + SliderFileSystem fileSystem) + throws IOException { + String keytabPathOnHost = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM).get( + SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); + if (SliderUtils.isUnset(keytabPathOnHost)) { + String amKeytabName = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM).get( + SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + String keytabDir = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM).get( + SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR); + // we need to localize the keytab files in the directory + Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null, + getClusterName()); + boolean serviceKeytabsDeployed = false; + if (fileSystem.getFileSystem().exists(keytabDirPath)) { + FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(keytabDirPath); + LocalResource keytabRes; + for (FileStatus keytab : keytabs) { + if (!amKeytabName.equals(keytab.getPath().getName()) + && keytab.getPath().getName().endsWith(".keytab")) { + serviceKeytabsDeployed = true; + log.info("Localizing keytab {}", keytab.getPath().getName()); + keytabRes = fileSystem.createAmResource(keytab.getPath(), + LocalResourceType.FILE); + launcher.addLocalResource(SliderKeys.KEYTAB_DIR + "/" + + keytab.getPath().getName(), + keytabRes); + } + } + } + if (!serviceKeytabsDeployed) { + log.warn("No service keytabs for the application have been localized. " + + "If the application requires keytabs for secure operation, " + + "please ensure that the required keytabs have been uploaded " + + "to the folder {}", keytabDirPath); + } + } + } + + private void createConfigFile(SliderFileSystem fileSystem, File file, + ConfigFile configFile, Map config) + throws IOException { + ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType()); + log.info("Writing {} file {}", configFormat, file); + + ConfigUtils.prepConfigForTemplateOutputter(configFormat, config, + fileSystem, getClusterName(), file.getName()); + PublishedConfiguration publishedConfiguration = + new PublishedConfiguration(configFile.getDictionaryName(), + config.entrySet()); + PublishedConfigurationOutputter configurationOutputter = + PublishedConfigurationOutputter.createOutputter(configFormat, + publishedConfiguration); + configurationOutputter.save(file); + } + + @VisibleForTesting + protected void localizeConfigFiles(ContainerLauncher launcher, + String roleName, String roleGroup, + Metainfo metainfo, + Map> configs, + MapOperations env, + SliderFileSystem fileSystem) + throws IOException { + for (ConfigFile configFile : metainfo.getComponentConfigFiles(roleGroup)) { + Map config = ConfigUtils.replacePropsInConfig( + configs.get(configFile.getDictionaryName()), env.options); + String fileName = ConfigUtils.replaceProps(config, + configFile.getFileName()); + File localFile = new File(SliderKeys.RESOURCE_DIR); + if (!localFile.exists()) { + localFile.mkdir(); + } + localFile = new File(localFile, new File(fileName).getName()); + + String folder = null; + if ("true".equals(config.get(PER_COMPONENT))) { + folder = roleName; + } else if ("true".equals(config.get(PER_GROUP))) { + folder = roleGroup; + } + + log.info("Localizing {} configs to config file {} (destination {}) " + + "based on {} configs", config.size(), localFile, fileName, + configFile.getDictionaryName()); + createConfigFile(fileSystem, localFile, configFile, config); + Path destPath = uploadResource(localFile, fileSystem, folder); + LocalResource configResource = fileSystem.createAmResource(destPath, + LocalResourceType.FILE); + + File destFile = new File(fileName); + if (destFile.isAbsolute()) { + launcher.addLocalResource( + SliderKeys.RESOURCE_DIR + "/" + destFile.getName(), + configResource, fileName); + } else { + launcher.addLocalResource(AgentKeys.APP_CONF_DIR + "/" + fileName, + configResource); + } + } + } + + /** + * build the zookeeper registry path. + * + * @return the path the service registered at + * @throws NullPointerException if the service has not yet registered + */ + private String getZkRegistryPath() { + Preconditions.checkNotNull(yarnRegistry, "Yarn registry not bound"); + String path = yarnRegistry.getAbsoluteSelfRegistrationPath(); + Preconditions.checkNotNull(path, "Service record path not defined"); + return path; + } + + @Override + public void rebuildContainerDetails(List liveContainers, + String applicationId, Map providerRoleMap) { + for (Container container : liveContainers) { + // get the role name and label + ProviderRole role = providerRoleMap.get(ContainerPriority + .extractRole(container)); + if (role != null) { + String roleName = role.name; + String label = getContainerLabel(container, roleName, role.group); + log.info("Rebuilding in-memory: container {} in role {} in cluster {}", + container.getId(), roleName, applicationId); + getComponentStatuses().put(label, + new ComponentInstanceState(roleName, container.getId(), + applicationId)); + } else { + log.warn("Role not found for container {} in cluster {}", + container.getId(), applicationId); + } + } + } + + @Override + public boolean isSupportedRole(String role) { + return true; + } + + /** + * Handle registration calls from the agents + * + * @param registration registration entry + * + * @return response + */ + @Override + public RegistrationResponse handleRegistration(Register registration) { + log.info("Handling registration: {}", registration); + RegistrationResponse response = new RegistrationResponse(); + String label = registration.getLabel(); + String pkg = registration.getPkg(); + State agentState = registration.getActualState(); + String appVersion = registration.getAppVersion(); + + log.info("label: {} pkg: {}", label, pkg); + + if (getComponentStatuses().containsKey(label)) { + response.setResponseStatus(RegistrationStatus.OK); + ComponentInstanceState componentStatus = getComponentStatuses().get(label); + componentStatus.heartbeat(System.currentTimeMillis()); + updateComponentStatusWithAgentState(componentStatus, agentState); + + String roleName = getRoleName(label); + String roleGroup = getRoleGroup(label); + String containerId = getContainerId(label); + + if (SliderUtils.isSet(registration.getTags())) { + tags.recordAssignedTag(roleName, containerId, registration.getTags()); + } else { + response.setTags(tags.getTag(roleName, containerId)); + } + + String hostFqdn = registration.getPublicHostname(); + Map ports = registration.getAllocatedPorts(); + if (ports != null && !ports.isEmpty()) { + processAllocatedPorts(hostFqdn, roleName, roleGroup, containerId, ports); + } + + Map folders = registration.getLogFolders(); + if (folders != null && !folders.isEmpty()) { + publishFolderPaths(folders, containerId, roleName, hostFqdn); + } + + // Set app version if empty. It gets unset during upgrade - why? + checkAndSetContainerAppVersion(containerId, appVersion); + } else { + response.setResponseStatus(RegistrationStatus.FAILED); + response.setLog("Label not recognized."); + log.warn("Received registration request from unknown label {}", label); + } + log.info("Registration response: {}", response); + return response; + } + + // Checks if app version is empty. Sets it to the version as reported by the + // container during registration phase. + private void checkAndSetContainerAppVersion(String containerId, + String appVersion) { + StateAccessForProviders amState = getAmState(); + try { + RoleInstance role = amState.getOwnedContainer(containerId); + if (role != null) { + String currentAppVersion = role.appVersion; + log.debug("Container = {}, app version current = {} new = {}", + containerId, currentAppVersion, appVersion); + if (currentAppVersion == null + || currentAppVersion.equals(APP_VERSION_UNKNOWN)) { + amState.getOwnedContainer(containerId).appVersion = appVersion; + } + } + } catch (NoSuchNodeException e) { + // ignore - there is nothing to do if we don't find a container + log.warn("Owned container {} not found - {}", containerId, e); + } + } + + /** + * Handle heartbeat response from agents + * + * @param heartBeat incoming heartbeat from Agent + * + * @return response to send back + */ + @Override + public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) { + log.debug("Handling heartbeat: {}", heartBeat); + HeartBeatResponse response = new HeartBeatResponse(); + long id = heartBeat.getResponseId(); + response.setResponseId(id + 1L); + + String label = heartBeat.getHostname(); + String pkg = heartBeat.getPackage(); + + log.debug("package received: " + pkg); + + String roleName = getRoleName(label); + String roleGroup = getRoleGroup(label); + String containerId = getContainerId(label); + boolean doUpgrade = false; + if (isInUpgradeMode && upgradeContainers.contains(containerId)) { + doUpgrade = true; + } + + StateAccessForProviders accessor = getAmState(); + CommandScript cmdScript = getScriptPathForMasterPackage(roleGroup); + List commands = getMetaInfo().getApplicationComponent(roleGroup).getCommands(); + + if (!isDockerContainer(roleGroup) && !isYarnDockerContainer(roleGroup) + && (cmdScript == null || cmdScript.getScript() == null) + && commands.size() == 0) { + log.error( + "role.script is unavailable for {}. Commands will not be sent.", + roleName); + return response; + } + + String scriptPath = null; + long timeout = 600L; + if (cmdScript != null) { + scriptPath = cmdScript.getScript(); + timeout = cmdScript.getTimeout(); + } + + if (timeout == 0L) { + timeout = 600L; + } + + if (!getComponentStatuses().containsKey(label)) { + // container is completed but still heart-beating, send terminate signal + log.info( + "Sending terminate signal to completed container (still heartbeating): {}", + label); + response.setTerminateAgent(true); + return response; + } + + List statuses = heartBeat.getComponentStatus(); + if (statuses != null && !statuses.isEmpty()) { + log.info("status from agent: " + statuses.toString()); + try { + for(ComponentStatus status : statuses){ + RoleInstance role = null; + if(status.getIp() != null && !status.getIp().isEmpty()){ + role = amState.getOwnedContainer(containerId); + role.ip = status.getIp(); + } + if(status.getHostname() != null && !status.getHostname().isEmpty()){ + role = amState.getOwnedContainer(containerId); + role.hostname = status.getHostname(); + } + if (role != null) { + // create an updated service record (including hostname and ip) and publish... + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, containerId); + record.description = roleName; + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.CONTAINER); + // TODO: switch record attributes to use constants from YarnRegistryAttributes + // when it's been updated. + if (role.ip != null) { + record.set("yarn:ip", role.ip); + } + if (role.hostname != null) { + record.set("yarn:hostname", role.hostname); + } + yarnRegistry.putComponent( + RegistryPathUtils.encodeYarnID(containerId), record); + + } + } + + + } catch (NoSuchNodeException e) { + // ignore - there is nothing to do if we don't find a container + log.warn("Owned container {} not found - {}", containerId, e); + } catch (IOException e) { + log.warn("Error updating container {} service record in registry", + containerId, e); + } + } + + Boolean isMaster = isMaster(roleGroup); + ComponentInstanceState componentStatus = getComponentStatuses().get(label); + componentStatus.heartbeat(System.currentTimeMillis()); + if (doUpgrade) { + switch (componentStatus.getState()) { + case STARTED: + componentStatus.setTargetState(State.UPGRADED); + break; + case UPGRADED: + componentStatus.setTargetState(State.STOPPED); + break; + case STOPPED: + componentStatus.setTargetState(State.TERMINATING); + break; + default: + break; + } + log.info("Current state = {} target state {}", + componentStatus.getState(), componentStatus.getTargetState()); + } + + if (appStopInitiated && !componentStatus.isStopInitiated()) { + log.info("Stop initiated for label {}", label); + componentStatus.setTargetState(State.STOPPED); + componentStatus.setStopInitiated(true); + } + + publishConfigAndExportGroups(heartBeat, componentStatus, roleGroup); + CommandResult result = null; + List reports = heartBeat.getReports(); + if (SliderUtils.isNotEmpty(reports)) { + CommandReport report = reports.get(0); + Map ports = report.getAllocatedPorts(); + if (SliderUtils.isNotEmpty(ports)) { + processAllocatedPorts(heartBeat.getFqdn(), roleName, roleGroup, containerId, ports); + } + result = CommandResult.getCommandResult(report.getStatus()); + Command command = Command.getCommand(report.getRoleCommand()); + componentStatus.applyCommandResult(result, command, pkg); + log.info("Component operation. Status: {}; new container state: {};" + + " new component state: {}", result, + componentStatus.getContainerState(), componentStatus.getState()); + + if (command == Command.INSTALL && SliderUtils.isNotEmpty(report.getFolders())) { + publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn()); + } + } + + int waitForCount = accessor.getInstanceDefinitionSnapshot(). + getAppConfOperations().getComponentOptInt(roleGroup, AgentKeys.WAIT_HEARTBEAT, 0); + + if (id < waitForCount) { + log.info("Waiting until heartbeat count {}. Current val: {}", waitForCount, id); + getComponentStatuses().put(label, componentStatus); + return response; + } + + Command command = componentStatus.getNextCommand(doUpgrade); + try { + if (Command.NOP != command) { + log.debug("For comp {} pkg {} issuing {}", roleName, + componentStatus.getNextPkgToInstall(), command.toString()); + if (command == Command.INSTALL) { + log.info("Installing {} on {}.", roleName, containerId); + if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){ + addInstallDockerCommand(roleName, roleGroup, containerId, + response, null, timeout); + } else if (scriptPath != null) { + addInstallCommand(roleName, roleGroup, containerId, response, + scriptPath, null, timeout, null); + } else { + // commands + ComponentCommand installCmd = null; + for (ComponentCommand compCmd : commands) { + if (compCmd.getName().equals("INSTALL")) { + installCmd = compCmd; + } + } + addInstallCommand(roleName, roleGroup, containerId, response, null, + installCmd, timeout, null); + } + componentStatus.commandIssued(command); + } else if (command == Command.INSTALL_ADDON) { + String nextPkgToInstall = componentStatus.getNextPkgToInstall(); + // retrieve scriptPath or command of that package for the component + for (ComponentsInAddonPackage comp : packageMetainfo + .get(nextPkgToInstall).getApplicationPackage().getComponents()) { + // given nextPkgToInstall and roleName is determined, the if below + // should only execute once per heartbeat + log.debug("Addon component: {} pkg: {} script: {}", comp.getName(), + nextPkgToInstall, comp.getCommandScript().getScript()); + if (comp.getName().equals(roleGroup) + || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) { + scriptPath = comp.getCommandScript().getScript(); + if (scriptPath != null) { + addInstallCommand(roleName, roleGroup, containerId, response, + scriptPath, null, timeout, nextPkgToInstall); + } else { + ComponentCommand installCmd = null; + for (ComponentCommand compCmd : comp.getCommands()) { + if (compCmd.getName().equals("INSTALL")) { + installCmd = compCmd; + } + } + addInstallCommand(roleName, roleGroup, containerId, response, + null, installCmd, timeout, nextPkgToInstall); + } + } + } + componentStatus.commandIssued(command); + } else if (command == Command.START) { + // check against dependencies + boolean canExecute = commandOrder.canExecute(roleGroup, command, getComponentStatuses().values()); + if (canExecute) { + log.info("Starting {} on {}.", roleName, containerId); + if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){ + addStartDockerCommand(roleName, roleGroup, containerId, + response, null, timeout, false); + } else if (scriptPath != null) { + addStartCommand(roleName, + roleGroup, + containerId, + response, + scriptPath, + null, + null, + timeout, + isMarkedAutoRestart(roleGroup)); + } else { + ComponentCommand startCmd = null; + for (ComponentCommand compCmd : commands) { + if (compCmd.getName().equals("START")) { + startCmd = compCmd; + } + } + ComponentCommand stopCmd = null; + for (ComponentCommand compCmd : commands) { + if (compCmd.getName().equals("STOP")) { + stopCmd = compCmd; + } + } + addStartCommand(roleName, roleGroup, containerId, response, null, + startCmd, stopCmd, timeout, false); + } + componentStatus.commandIssued(command); + } else { + log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId); + } + } else if (command == Command.UPGRADE) { + addUpgradeCommand(roleName, roleGroup, containerId, response, + scriptPath, timeout); + componentStatus.commandIssued(command, true); + } else if (command == Command.STOP) { + log.info("Stop command being sent to container with id {}", + containerId); + addStopCommand(roleName, roleGroup, containerId, response, scriptPath, + timeout, doUpgrade); + componentStatus.commandIssued(command); + } else if (command == Command.TERMINATE) { + log.info("A formal terminate command is being sent to container {}" + + " in state {}", label, componentStatus.getState()); + response.setTerminateAgent(true); + } + } + + // if there is no outstanding command then retrieve config + if (isMaster && componentStatus.getState() == State.STARTED + && command == Command.NOP) { + if (!componentStatus.getConfigReported()) { + log.info("Requesting applied config for {} on {}.", roleName, containerId); + if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){ + addGetConfigDockerCommand(roleName, roleGroup, containerId, response); + } else { + addGetConfigCommand(roleName, roleGroup, containerId, response); + } + } + } + + // if restart is required then signal + response.setRestartEnabled(false); + if (componentStatus.getState() == State.STARTED + && command == Command.NOP && isMarkedAutoRestart(roleGroup)) { + response.setRestartEnabled(true); + } + + //If INSTALL_FAILED and no INSTALL is scheduled let the agent fail + if (componentStatus.getState() == State.INSTALL_FAILED + && command == Command.NOP) { + log.warn("Sending terminate signal to container that failed installation: {}", label); + response.setTerminateAgent(true); + } + + } catch (SliderException e) { + log.warn("Component instance failed operation.", e); + componentStatus.applyCommandResult(CommandResult.FAILED, command, null); + } + + log.debug("Heartbeat response: " + response); + return response; + } + + private boolean isDockerContainer(String roleGroup) { + String type = getMetaInfo().getApplicationComponent(roleGroup).getType(); + if (SliderUtils.isSet(type)) { + return type.toLowerCase().equals(SliderUtils.DOCKER) || type.toLowerCase().equals(SliderUtils.DOCKER_YARN); + } + return false; + } + + private boolean isYarnDockerContainer(String roleGroup) { + String type = getMetaInfo().getApplicationComponent(roleGroup).getType(); + if (SliderUtils.isSet(type)) { + return type.toLowerCase().equals(SliderUtils.DOCKER_YARN); + } + return false; + } + + protected void processAllocatedPorts(String fqdn, + String roleName, + String roleGroup, + String containerId, + Map ports) { + RoleInstance instance; + try { + instance = getAmState().getOwnedContainer(containerId); + } catch (NoSuchNodeException e) { + log.warn("Failed to locate instance of container {}", containerId, e); + instance = null; + } + for (Map.Entry port : ports.entrySet()) { + String portname = port.getKey(); + String portNo = port.getValue(); + log.info("Recording allocated port for {} as {}", portname, portNo); + + // add the allocated ports to the global list as well as per container list + // per container allocation will over-write each other in the global + this.getAllocatedPorts().put(portname, portNo); + this.getAllocatedPorts(containerId).put(portname, portNo); + if (instance != null) { + try { + // if the returned value is not a single port number then there are no + // meaningful way for Slider to use it during export + // No need to error out as it may not be the responsibility of the component + // to allocate port or the component may need an array of ports + instance.registerPortEndpoint(Integer.valueOf(portNo), portname); + } catch (NumberFormatException e) { + log.warn("Failed to parse {}", portNo, e); + } + } + } + + processAndPublishComponentSpecificData(ports, containerId, fqdn, roleGroup); + processAndPublishComponentSpecificExports(ports, containerId, fqdn, roleName, roleGroup); + + // and update registration entries + if (instance != null) { + queueAccess.put(new RegisterComponentInstance(instance.getId(), + roleName, roleGroup, 0, TimeUnit.MILLISECONDS)); + } + } + + private void updateComponentStatusWithAgentState( + ComponentInstanceState componentStatus, State agentState) { + if (agentState != null) { + componentStatus.setState(agentState); + } + } + + @Override + public Map buildMonitorDetails(ClusterDescription clusterDesc) { + Map details = super.buildMonitorDetails(clusterDesc); + buildRoleHostDetails(details); + return details; + } + + @Override + public void applyInitialRegistryDefinitions(URL amWebURI, + URL agentOpsURI, + URL agentStatusURI, + ServiceRecord serviceRecord) + throws IOException { + super.applyInitialRegistryDefinitions(amWebURI, + agentOpsURI, + agentStatusURI, + serviceRecord); + + try { + URL restURL = new URL(agentOpsURI, SLIDER_PATH_AGENTS); + URL agentStatusURL = new URL(agentStatusURI, SLIDER_PATH_AGENTS); + + serviceRecord.addInternalEndpoint( + new Endpoint(CustomRegistryConstants.AGENT_SECURE_REST_API, + ProtocolTypes.PROTOCOL_REST, + restURL.toURI())); + serviceRecord.addInternalEndpoint( + new Endpoint(CustomRegistryConstants.AGENT_ONEWAY_REST_API, + ProtocolTypes.PROTOCOL_REST, + agentStatusURL.toURI())); + } catch (URISyntaxException e) { + throw new IOException(e); + } + + // identify client component + Component client = null; + for (Component component : getMetaInfo().getApplication().getComponents()) { + if (component.getCategory().equals("CLIENT")) { + client = component; + break; + } + } + if (client == null) { + log.info("No client component specified, not publishing client configs"); + return; + } + + // register AM-generated client configs + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + MapOperations clientOperations = appConf.getOrAddComponent(client.getName()); + appConf.resolve(); + if (!clientOperations.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, + false)) { + log.info("AM config generation is false, not publishing client configs"); + return; + } + + // build and localize configuration files + Map> configurations = new TreeMap>(); + Map tokens = null; + try { + tokens = getStandardTokenMap(appConf, client.getName(), client.getName()); + } catch (SliderException e) { + throw new IOException(e); + } + + for (ConfigFile configFile : getMetaInfo() + .getComponentConfigFiles(client.getName())) { + addNamedConfiguration(configFile.getDictionaryName(), + appConf.getGlobalOptions().options, configurations, tokens, null, + client.getName()); + if (appConf.getComponent(client.getName()) != null) { + addNamedConfiguration(configFile.getDictionaryName(), + appConf.getComponent(client.getName()).options, configurations, + tokens, null, client.getName()); + } + } + + //do a final replacement of re-used configs + dereferenceAllConfigs(configurations); + + for (ConfigFile configFile : getMetaInfo() + .getComponentConfigFiles(client.getName())) { + ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType()); + + Map config = configurations.get(configFile.getDictionaryName()); + ConfigUtils.prepConfigForTemplateOutputter(configFormat, config, + fileSystem, getClusterName(), + new File(configFile.getFileName()).getName()); + PublishedConfiguration publishedConfiguration = + new PublishedConfiguration(configFile.getDictionaryName(), + config.entrySet()); + getAmState().getPublishedSliderConfigurations().put( + configFile.getDictionaryName(), publishedConfiguration); + log.info("Publishing AM configuration {}", configFile.getDictionaryName()); + } + } + + @Override + public void notifyContainerCompleted(ContainerId containerId) { + // containers get allocated and free'ed without being assigned to any + // component - so many of the data structures may not be initialized + if (containerId != null) { + String containerIdStr = containerId.toString(); + if (getComponentInstanceData().containsKey(containerIdStr)) { + getComponentInstanceData().remove(containerIdStr); + log.info("Removing container specific data for {}", containerIdStr); + publishComponentInstanceData(); + } + + if (this.allocatedPorts.containsKey(containerIdStr)) { + Map portsByContainerId = getAllocatedPorts(containerIdStr); + this.allocatedPorts.remove(containerIdStr); + // free up the allocations from global as well + // if multiple containers allocate global ports then last one + // wins and similarly first one removes it - its not supported anyway + for(String portName : portsByContainerId.keySet()) { + getAllocatedPorts().remove(portName); + } + + } + + String componentName = null; + synchronized (this.componentStatuses) { + for (String label : getComponentStatuses().keySet()) { + if (label.startsWith(containerIdStr)) { + componentName = getRoleName(label); + log.info("Removing component status for label {}", label); + getComponentStatuses().remove(label); + } + } + } + + tags.releaseTag(componentName, containerIdStr); + + synchronized (this.containerExportsMap) { + Set containerExportSets = containerExportsMap.get(containerIdStr); + if (containerExportSets != null) { + for (String containerExportStr : containerExportSets) { + String[] parts = containerExportStr.split(":"); + Map> exportGroup = getCurrentExports(parts[0]); + List exports = exportGroup.get(parts[1]); + List exportToRemove = new ArrayList(); + for (ExportEntry export : exports) { + if (containerIdStr.equals(export.getContainerId())) { + exportToRemove.add(export); + } + } + exports.removeAll(exportToRemove); + } + log.info("Removing container exports for {}", containerIdStr); + containerExportsMap.remove(containerIdStr); + } + } + } + } + + /** + * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default. + * + * @param instanceDefinition + */ + private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) { + String hbMonitorInterval = instanceDefinition.getAppConfOperations(). + getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL, + Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL)); + try { + setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval)); + } catch (NumberFormatException e) { + log.warn( + "Bad value {} for {}. Defaulting to ", + hbMonitorInterval, + HEARTBEAT_MONITOR_INTERVAL, + DEFAULT_HEARTBEAT_MONITOR_INTERVAL); + } + } + + /** + * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default. + * + * @param instanceDefinition + */ + private void initializeAgentDebugCommands(AggregateConf instanceDefinition) { + String launchParameterStr = instanceDefinition.getAppConfOperations(). + getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, ""); + agentLaunchParameter = new AgentLaunchParameter(launchParameterStr); + } + + @VisibleForTesting + protected Map getLogFolderExports() { + return logFolderExports; + } + + @VisibleForTesting + protected Map getWorkFolderExports() { + return workFolderExports; + } + + @VisibleForTesting + protected Metainfo getMetaInfo() { + return this.metaInfo; + } + + @VisibleForTesting + protected Map getComponentStatuses() { + return componentStatuses; + } + + @VisibleForTesting + protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem, + String appDef, boolean addonPackage) throws IOException, + BadConfigException { + return AgentUtils.getApplicationMetainfo(fileSystem, appDef, addonPackage); + } + + @VisibleForTesting + protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem, + String appDef) throws IOException, BadConfigException { + return getApplicationMetainfo(fileSystem, appDef, false); + } + + @VisibleForTesting + protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) { + this.heartbeatMonitorInterval = heartbeatMonitorInterval; + } + + public void setInUpgradeMode(boolean inUpgradeMode) { + this.isInUpgradeMode = inUpgradeMode; + } + + public void addUpgradeContainers(Set upgradeContainers) { + this.upgradeContainers.addAll(upgradeContainers); + } + + public void setAppStopInitiated(boolean appStopInitiated) { + this.appStopInitiated = appStopInitiated; + } + + /** + * Read all default configs + * + * @param fileSystem fs + * @param appDef app default path + * @param metainfo metadata + * + * @return configuration maps + * + * @throws IOException + */ + protected Map initializeDefaultConfigs(SliderFileSystem fileSystem, + String appDef, Metainfo metainfo) throws IOException { + Map defaultConfigMap = new HashMap<>(); + if (SliderUtils.isNotEmpty(metainfo.getApplication().getConfigFiles())) { + for (ConfigFile configFile : metainfo.getApplication().getConfigFiles()) { + DefaultConfig config = null; + try { + config = AgentUtils.getDefaultConfig(fileSystem, appDef, configFile.getDictionaryName() + ".xml"); + } catch (IOException e) { + log.warn("Default config file not found. Only the config as input during create will be applied for {}", + configFile.getDictionaryName()); + } + if (config != null) { + defaultConfigMap.put(configFile.getDictionaryName(), config); + } + } + } + + return defaultConfigMap; + } + + protected Map getDefaultConfigs() { + return defaultConfigs; + } + + private int getHeartbeatMonitorInterval() { + return this.heartbeatMonitorInterval; + } + + private String getClusterName() { + if (SliderUtils.isUnset(clusterName)) { + clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME); + } + return clusterName; + } + + /** + * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site + * + * @param name + * @param description + * @param entries + */ + protected void publishApplicationInstanceData(String name, String description, + Iterable> entries) { + PublishedConfiguration pubconf = new PublishedConfiguration(); + pubconf.description = description; + pubconf.putValues(entries); + log.info("publishing {}", pubconf); + getAmState().getPublishedSliderConfigurations().put(name, pubconf); + } + + /** + * Get a list of all hosts for all role/container per role + * + * @return the map of role->node + */ + protected Map> getRoleClusterNodeMapping() { + return amState.getRoleClusterNodeMapping(); + } + + private String getContainerLabel(Container container, String role, String group) { + if (role.equals(group)) { + return container.getId().toString() + LABEL_MAKER + role; + } else { + return container.getId().toString() + LABEL_MAKER + role + LABEL_MAKER + + group; + } + } + + protected String getClusterInfoPropertyValue(String name) { + StateAccessForProviders accessor = getAmState(); + assert accessor.isApplicationLive(); + ClusterDescription description = accessor.getClusterStatus(); + return description.getInfo(name); + } + + protected String getClusterOptionPropertyValue(String name) + throws BadConfigException { + StateAccessForProviders accessor = getAmState(); + assert accessor.isApplicationLive(); + ClusterDescription description = accessor.getClusterStatus(); + return description.getMandatoryOption(name); + } + + /** + * Lost heartbeat from the container - release it and ask for a replacement (async operation) + * + * @param label + * @param containerId + */ + protected void lostContainer( + String label, + ContainerId containerId) { + getComponentStatuses().remove(label); + getQueueAccess().put(new ProviderReportedContainerLoss(containerId)); + } + + /** + * Build the provider status, can be empty + * + * @return the provider status - map of entries to add to the info section + */ + public Map buildProviderStatus() { + Map stats = new HashMap(); + return stats; + } + + + /** + * Format the folder locations and publish in the registry service + * + * @param folders + * @param containerId + * @param hostFqdn + * @param componentName + */ + protected void publishFolderPaths( + Map folders, String containerId, String componentName, String hostFqdn) { + Date now = new Date(); + for (Map.Entry entry : folders.entrySet()) { + ExportEntry exportEntry = new ExportEntry(); + exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue())); + exportEntry.setContainerId(containerId); + exportEntry.setLevel(COMPONENT_TAG); + exportEntry.setTag(componentName); + exportEntry.setUpdatedTime(now.toString()); + if (entry.getKey().equals("AGENT_LOG_ROOT")) { + synchronized (logFolderExports) { + getLogFolderExports().put(containerId, exportEntry); + } + } else { + synchronized (workFolderExports) { + getWorkFolderExports().put(containerId, exportEntry); + } + } + log.info("Updating log and pwd folders for container {}", containerId); + } + + PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG); + exports.setUpdated(now.getTime()); + synchronized (logFolderExports) { + updateExportsFromList(exports, getLogFolderExports()); + } + getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports); + + exports = new PublishedExports(CONTAINER_PWDS_TAG); + exports.setUpdated(now.getTime()); + synchronized (workFolderExports) { + updateExportsFromList(exports, getWorkFolderExports()); + } + getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports); + } + + /** + * Update the export data from the map + * @param exports + * @param folderExports + */ + private void updateExportsFromList(PublishedExports exports, Map folderExports) { + Map> perComponentList = new HashMap>(); + for(Map.Entry logEntry : folderExports.entrySet()) + { + String componentName = logEntry.getValue().getTag(); + if (!perComponentList.containsKey(componentName)) { + perComponentList.put(componentName, new ArrayList()); + } + perComponentList.get(componentName).add(logEntry.getValue()); + } + exports.putValues(perComponentList.entrySet()); + } + + + /** + * Process return status for component instances + * + * @param heartBeat + * @param componentStatus + */ + protected void publishConfigAndExportGroups(HeartBeat heartBeat, + ComponentInstanceState componentStatus, String componentGroup) { + List statuses = heartBeat.getComponentStatus(); + if (statuses != null && !statuses.isEmpty()) { + log.info("Processing {} status reports.", statuses.size()); + for (ComponentStatus status : statuses) { + log.info("Status report: {}", status.toString()); + + if (status.getConfigs() != null) { + Application application = getMetaInfo().getApplication(); + + if ((!canAnyMasterPublishConfig() || canPublishConfig(componentGroup)) && + !getAmState().getAppConfSnapshot().getComponentOptBool( + componentGroup, AgentKeys.AM_CONFIG_GENERATION, false)) { + // If no Master can explicitly publish then publish if its a master + // Otherwise, wait till the master that can publish is ready + + Set exportedConfigs = new HashSet(); + String exportedConfigsStr = application.getExportedConfigs(); + boolean exportedAllConfigs = exportedConfigsStr == null || exportedConfigsStr.isEmpty(); + if (!exportedAllConfigs) { + for (String exportedConfig : exportedConfigsStr.split(",")) { + if (exportedConfig.trim().length() > 0) { + exportedConfigs.add(exportedConfig.trim()); + } + } + } + + for (String key : status.getConfigs().keySet()) { + if ((!exportedAllConfigs && exportedConfigs.contains(key)) || + exportedAllConfigs) { + Map configs = status.getConfigs().get(key); + publishApplicationInstanceData(key, key, configs.entrySet()); + } + } + } + + List appExportGroups = application.getExportGroups(); + boolean hasExportGroups = SliderUtils.isNotEmpty(appExportGroups); + + Set appExports = new HashSet(); + String appExportsStr = getApplicationComponent(componentGroup).getAppExports(); + if (SliderUtils.isSet(appExportsStr)) { + for (String appExport : appExportsStr.split(",")) { + if (!appExport.trim().isEmpty()) { + appExports.add(appExport.trim()); + } + } + } + + if (hasExportGroups && !appExports.isEmpty()) { + String configKeyFormat = "${site.%s.%s}"; + String hostKeyFormat = "${%s_HOST}"; + + // publish export groups if any + Map replaceTokens = new HashMap(); + for (Map.Entry> entry : getRoleClusterNodeMapping().entrySet()) { + String hostName = getHostsList(entry.getValue().values(), true).iterator().next(); + replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName); + } + + for (String key : status.getConfigs().keySet()) { + Map configs = status.getConfigs().get(key); + for (String configKey : configs.keySet()) { + String lookupKey = String.format(configKeyFormat, key, configKey); + replaceTokens.put(lookupKey, configs.get(configKey)); + } + } + + Set modifiedGroups = new HashSet(); + for (ExportGroup exportGroup : appExportGroups) { + List exports = exportGroup.getExports(); + if (SliderUtils.isNotEmpty(exports)) { + String exportGroupName = exportGroup.getName(); + ConcurrentHashMap> map = + (ConcurrentHashMap>)getCurrentExports(exportGroupName); + for (Export export : exports) { + if (canBeExported(exportGroupName, export.getName(), appExports)) { + String value = export.getValue(); + // replace host names + for (String token : replaceTokens.keySet()) { + if (value.contains(token)) { + value = value.replace(token, replaceTokens.get(token)); + } + } + ExportEntry entry = new ExportEntry(); + entry.setLevel(APPLICATION_TAG); + entry.setValue(value); + entry.setUpdatedTime(new Date().toString()); + // over-write, app exports are singletons + map.put(export.getName(), new ArrayList(Arrays.asList(entry))); + log.info("Preparing to publish. Key {} and Value {}", export.getName(), value); + } + } + modifiedGroups.add(exportGroupName); + } + } + publishModifiedExportGroups(modifiedGroups); + } + + log.info("Received and processed config for {}", heartBeat.getHostname()); + componentStatus.setConfigReported(true); + + } + } + } + } + + private boolean canBeExported(String exportGroupName, String name, Set appExports) { + return appExports.contains(String.format("%s-%s", exportGroupName, name)); + } + + protected Map> getCurrentExports(String groupName) { + if (!this.exportGroups.containsKey(groupName)) { + synchronized (this.exportGroups) { + if (!this.exportGroups.containsKey(groupName)) { + this.exportGroups.put(groupName, new ConcurrentHashMap>()); + } + } + } + + return this.exportGroups.get(groupName); + } + + private void publishModifiedExportGroups(Set modifiedGroups) { + for (String groupName : modifiedGroups) { + Map> entries = this.exportGroups.get(groupName); + + // Publish in old format for the time being + Map simpleEntries = new HashMap(); + for (Map.Entry> entry : entries.entrySet()) { + List exports = entry.getValue(); + if (SliderUtils.isNotEmpty(exports)) { + // there is no support for multiple exports per name - so extract only the first one + simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue()); + } + } + if (!getAmState().getAppConfSnapshot().getComponentOptBool( + groupName, AgentKeys.AM_CONFIG_GENERATION, false)) { + publishApplicationInstanceData(groupName, groupName, + simpleEntries.entrySet()); + } + + PublishedExports exports = new PublishedExports(groupName); + exports.setUpdated(new Date().getTime()); + exports.putValues(entries.entrySet()); + getAmState().getPublishedExportsSet().put(groupName, exports); + } + } + + /** Publish component instance specific data if the component demands it */ + protected void processAndPublishComponentSpecificData(Map ports, + String containerId, + String hostFqdn, + String componentGroup) { + String portVarFormat = "${site.%s}"; + String hostNamePattern = "${THIS_HOST}"; + Map toPublish = new HashMap(); + + Application application = getMetaInfo().getApplication(); + for (Component component : application.getComponents()) { + if (component.getName().equals(componentGroup)) { + if (component.getComponentExports().size() > 0) { + + for (ComponentExport export : component.getComponentExports()) { + String templateToExport = export.getValue(); + for (String portName : ports.keySet()) { + boolean publishData = false; + String portValPattern = String.format(portVarFormat, portName); + if (templateToExport.contains(portValPattern)) { + templateToExport = templateToExport.replace(portValPattern, ports.get(portName)); + publishData = true; + } + if (templateToExport.contains(hostNamePattern)) { + templateToExport = templateToExport.replace(hostNamePattern, hostFqdn); + publishData = true; + } + if (publishData) { + toPublish.put(export.getName(), templateToExport); + log.info("Publishing {} for name {} and container {}", + templateToExport, export.getName(), containerId); + } + } + } + } + } + } + + if (toPublish.size() > 0) { + Map perContainerData = null; + if (!getComponentInstanceData().containsKey(containerId)) { + perContainerData = new ConcurrentHashMap(); + } else { + perContainerData = getComponentInstanceData().get(containerId); + } + perContainerData.putAll(toPublish); + getComponentInstanceData().put(containerId, perContainerData); + publishComponentInstanceData(); + } + } + + /** Publish component instance specific data if the component demands it */ + protected void processAndPublishComponentSpecificExports(Map ports, + String containerId, + String hostFqdn, + String compName, + String compGroup) { + String portVarFormat = "${site.%s}"; + String hostNamePattern = "${" + compGroup + "_HOST}"; + + List appExportGroups = getMetaInfo().getApplication().getExportGroups(); + Component component = getMetaInfo().getApplicationComponent(compGroup); + if (component != null && SliderUtils.isSet(component.getCompExports()) + && SliderUtils.isNotEmpty(appExportGroups)) { + + Set compExports = new HashSet(); + String compExportsStr = component.getCompExports(); + for (String compExport : compExportsStr.split(",")) { + if (!compExport.trim().isEmpty()) { + compExports.add(compExport.trim()); + } + } + + Date now = new Date(); + Set modifiedGroups = new HashSet(); + for (ExportGroup exportGroup : appExportGroups) { + List exports = exportGroup.getExports(); + if (SliderUtils.isNotEmpty(exports)) { + String exportGroupName = exportGroup.getName(); + ConcurrentHashMap> map = + (ConcurrentHashMap>) getCurrentExports(exportGroupName); + for (Export export : exports) { + if (canBeExported(exportGroupName, export.getName(), compExports)) { + log.info("Attempting to publish {} of group {} for component type {}", + export.getName(), exportGroupName, compName); + String templateToExport = export.getValue(); + for (String portName : ports.keySet()) { + boolean publishData = false; + String portValPattern = String.format(portVarFormat, portName); + if (templateToExport.contains(portValPattern)) { + templateToExport = templateToExport.replace(portValPattern, ports.get(portName)); + publishData = true; + } + if (templateToExport.contains(hostNamePattern)) { + templateToExport = templateToExport.replace(hostNamePattern, hostFqdn); + publishData = true; + } + if (publishData) { + ExportEntry entryToAdd = new ExportEntry(); + entryToAdd.setLevel(COMPONENT_TAG); + entryToAdd.setValue(templateToExport); + entryToAdd.setUpdatedTime(now.toString()); + entryToAdd.setContainerId(containerId); + entryToAdd.setTag(tags.getTag(compName, containerId)); + + List existingList = + map.putIfAbsent(export.getName(), new CopyOnWriteA --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org