Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 23D47200C3A for ; Fri, 10 Feb 2017 03:34:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 224E4160B64; Fri, 10 Feb 2017 02:34:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D1619160B6E for ; Fri, 10 Feb 2017 03:34:45 +0100 (CET) Received: (qmail 3867 invoked by uid 500); 10 Feb 2017 02:34:44 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 3634 invoked by uid 99); 10 Feb 2017 02:34:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Feb 2017 02:34:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B866FDFE1D; Fri, 10 Feb 2017 02:34:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Fri, 10 Feb 2017 02:34:45 -0000 Message-Id: <3dfb884439184fe6a56b1226e918e97f@git.apache.org> In-Reply-To: <73445c0653c347f6b755198ac09be354@git.apache.org> References: <73445c0653c347f6b755198ac09be354@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] hadoop git commit: YARN-5505. Create an agent-less docker provider in the native-services framework. Contributed by Billie Rinaldi archived-at: Fri, 10 Feb 2017 02:34:48 -0000 YARN-5505. Create an agent-less docker provider in the native-services framework. Contributed by Billie Rinaldi Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb860960 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb860960 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb860960 Branch: refs/heads/yarn-native-services Commit: bb86096026dfd43f2544c0e870857a01e587b470 Parents: 4224575 Author: Jian He Authored: Thu Sep 1 22:38:42 2016 +0800 Committer: Jian He Committed: Thu Feb 9 16:58:05 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/slider/api/OptionKeys.java | 15 +- .../org/apache/slider/client/SliderClient.java | 17 +- .../org/apache/slider/common/SliderKeys.java | 22 +- .../apache/slider/common/tools/SliderUtils.java | 4 + .../slider/core/launch/AbstractLauncher.java | 18 +- .../PublishedConfigurationOutputter.java | 6 +- .../providers/AbstractClientProvider.java | 4 +- .../providers/AbstractProviderService.java | 22 +- .../slider/providers/ProviderService.java | 12 +- .../apache/slider/providers/ProviderUtils.java | 1391 ++++++++++++++---- .../providers/agent/AgentClientProvider.java | 36 +- .../slider/providers/agent/AgentKeys.java | 12 +- .../providers/agent/AgentProviderService.java | 705 ++------- .../providers/docker/DockerClientProvider.java | 96 ++ .../slider/providers/docker/DockerKeys.java | 32 + .../providers/docker/DockerProviderFactory.java | 43 + .../providers/docker/DockerProviderService.java | 355 +++++ .../slideram/SliderAMProviderService.java | 4 - .../server/appmaster/SliderAppMaster.java | 39 +- .../main/resources/org/apache/slider/slider.xml | 4 + .../slider/providers/docker/appConfig.json | 42 + .../slider/providers/docker/resources.json | 16 + .../slider/providers/docker/test.template | 16 + 23 files changed, 1971 insertions(+), 940 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java index a035a99..434b1d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java @@ -41,7 +41,20 @@ public interface OptionKeys extends InternalKeys { * Prefix for site.xml options: {@value} */ String SITE_XML_PREFIX = "site."; - + /** + * Prefix for config file options: {@value} + */ + String CONF_FILE_PREFIX = "conf."; + /** + * Prefix for package options: {@value} + */ + String PKG_FILE_PREFIX = "pkg."; + /** + * Prefix for export options: {@value} + */ + String EXPORT_PREFIX = "export."; + String TYPE_SUFFIX = ".type"; + String NAME_SUFFIX = ".name"; /** * Zookeeper quorum host list: {@value} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java index 3129f6f..5096bb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -151,7 +151,6 @@ import org.apache.slider.core.registry.YarnAppListClient; import org.apache.slider.core.registry.docstore.ConfigFormat; import org.apache.slider.core.registry.docstore.PublishedConfigSet; import org.apache.slider.core.registry.docstore.PublishedConfiguration; -import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; import org.apache.slider.core.registry.docstore.PublishedExports; import org.apache.slider.core.registry.docstore.PublishedExportsOutputter; import org.apache.slider.core.registry.docstore.PublishedExportsSet; @@ -162,6 +161,7 @@ import org.apache.slider.core.zk.ZKPathBuilder; import org.apache.slider.providers.AbstractClientProvider; import org.apache.slider.providers.SliderProviderFactory; import org.apache.slider.providers.agent.AgentKeys; +import org.apache.slider.providers.docker.DockerClientProvider; import org.apache.slider.providers.slideram.SliderAMClientProvider; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.rpc.RpcBinder; @@ -2081,7 +2081,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe // add the tags if available Set applicationTags = provider.getApplicationTags(sliderFileSystem, - getApplicationDefinitionPath(appOperations)); + appOperations); Credentials credentials = null; if (clusterSecure) { @@ -2242,11 +2242,12 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe ); - // TODO: consider supporting apps that don't have an image path - Path imagePath = - extractImagePath(sliderFileSystem, internalOptions); - if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) { - log.debug("Registered image path {}", imagePath); + if (!(provider instanceof DockerClientProvider)) { + Path imagePath = + extractImagePath(sliderFileSystem, internalOptions); + if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) { + log.debug("Registered image path {}", imagePath); + } } // build the environment @@ -3814,7 +3815,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe Path subPath = new Path(path1, appReport.getApplicationId() .toString() + "/agent"); imagePath = subPath.toString(); - String pathStr = imagePath + "/" + AGENT_TAR; + String pathStr = imagePath + "/" + AgentKeys.AGENT_TAR; try { validateHDFSFile(sliderFileSystem, pathStr); log.info("Slider agent package is properly installed at " + pathStr); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java index 120b1fc..1484ee3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java @@ -81,6 +81,10 @@ public interface SliderKeys extends SliderXmlConfKeys { String COMPONENT_SEPARATOR = "-"; String[] COMPONENT_KEYS_TO_SKIP = {"zookeeper.", "env.MALLOC_ARENA_MAX", "site.fs.", "site.dfs."}; + /** + * A component type for a client component + */ + String COMPONENT_TYPE_CLIENT = "client"; /** * Key for application version. This must be set in app_config/global {@value} @@ -222,7 +226,6 @@ public interface SliderKeys extends SliderXmlConfKeys { String SLIDER_JAR = "slider.jar"; String JCOMMANDER_JAR = "jcommander.jar"; String GSON_JAR = "gson.jar"; - String AGENT_TAR = "slider-agent.tar.gz"; String DEFAULT_APP_PKG = "appPkg.zip"; String DEFAULT_JVM_HEAP = "256M"; @@ -288,4 +291,21 @@ public interface SliderKeys extends SliderXmlConfKeys { String SLIDER_CLASSPATH_EXTRA = "SLIDER_CLASSPATH_EXTRA"; String YARN_CONTAINER_PATH = "/node/container/"; + + String GLOBAL_CONFIG_TAG = "global"; + String SYSTEM_CONFIGS = "system_configs"; + String JAVA_HOME = "java_home"; + String TWO_WAY_SSL_ENABLED = "ssl.server.client.auth"; + String INFRA_RUN_SECURITY_DIR = "infra/run/security/"; + String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt"; + + String AM_CONFIG_GENERATION = "am.config.generation"; + String APP_CONF_DIR = "app/conf"; + + String APP_RESOURCES = "application.resources"; + String APP_RESOURCES_DIR = "app/resources"; + String PER_COMPONENT = "per.component"; + String PER_GROUP = "per.group"; + + String APP_PACKAGES_DIR = "app/packages"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java index e9f65ba..f773982 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java @@ -183,6 +183,10 @@ public final class SliderUtils { return !isUnset(s); } + public static boolean isEmpty(List l) { + return l == null || l.isEmpty(); + } + /** * Probe for a list existing and not being empty * @param l list http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java index 5a3eb3d..aefc0de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java @@ -52,6 +52,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import static org.apache.slider.providers.docker.DockerKeys.DEFAULT_DOCKER_NETWORK; + /** * Launcher of applications: base class */ @@ -79,6 +81,7 @@ public abstract class AbstractLauncher extends Configured { protected LogAggregationContext logAggregationContext; protected boolean yarnDockerMode = false; protected String dockerImage; + protected String dockerNetwork = DEFAULT_DOCKER_NETWORK; protected String yarnContainerMountPoints; protected String runPrivilegedContainer; @@ -232,7 +235,8 @@ public abstract class AbstractLauncher extends Configured { if(yarnDockerMode){ Map env = containerLaunchContext.getEnvironment(); env.put("YARN_CONTAINER_RUNTIME_TYPE", "docker"); - env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);//if yarnDockerMode, then dockerImage is set + env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage); + env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_NETWORK", dockerNetwork); env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER", runPrivilegedContainer); StringBuilder sb = new StringBuilder(); for (Entry mount : mountPaths.entrySet()) { @@ -517,6 +521,10 @@ public abstract class AbstractLauncher extends Configured { this.dockerImage = dockerImage; } + public void setDockerNetwork(String dockerNetwork) { + this.dockerNetwork = dockerNetwork; + } + public void setYarnContainerMountPoints(String yarnContainerMountPoints) { this.yarnContainerMountPoints = yarnContainerMountPoints; } @@ -525,4 +533,12 @@ public abstract class AbstractLauncher extends Configured { this.runPrivilegedContainer = runPrivilegedContainer; } + public void setRunPrivilegedContainer(boolean runPrivilegedContainer) { + if (runPrivilegedContainer) { + this.runPrivilegedContainer = Boolean.toString(true); + } else { + this.runPrivilegedContainer = Boolean.toString(false); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java index 9bdcfcb..4ec513c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java @@ -39,6 +39,8 @@ import java.util.Properties; */ public abstract class PublishedConfigurationOutputter { + private static final String COMMENTS = "Generated by Apache Slider"; + protected final PublishedConfiguration owner; protected PublishedConfigurationOutputter(PublishedConfiguration owner) { @@ -143,13 +145,13 @@ public abstract class PublishedConfigurationOutputter { @Override public void save(OutputStream out) throws IOException { - properties.store(out, ""); + properties.store(out, COMMENTS); } public String asString() throws IOException { StringWriter sw = new StringWriter(); - properties.store(sw, ""); + properties.store(sw, COMMENTS); return sw.toString(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java index 510de5d..f59c347 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java @@ -216,8 +216,8 @@ public abstract class AbstractClientProvider extends Configured { * Return a set of application specific string tags. * @return the set of tags. */ - public Set getApplicationTags (SliderFileSystem fileSystem, - String appDef) throws SliderException { + public Set getApplicationTags(SliderFileSystem fileSystem, + ConfTreeOperations appConf) throws SliderException { return Collections.emptySet(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java index 92766f5..19fa07b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; @@ -139,6 +140,19 @@ public abstract class AbstractProviderService } /** + * Load default Configuration + * @param confDir configuration directory + * @return configuration + * @throws BadCommandArgumentsException + * @throws IOException + */ + @Override + public Configuration loadProviderConfigurationInformation(File confDir) + throws BadCommandArgumentsException, IOException { + return new Configuration(false); + } + + /** * Load a specific XML configuration file for the provider config * @param confDir configuration directory * @param siteXMLFilename provider-specific filename @@ -369,8 +383,6 @@ public abstract class AbstractProviderService @Override public void applyInitialRegistryDefinitions(URL amWebURI, - URL agentOpsURI, - URL agentStatusURI, ServiceRecord serviceRecord) throws IOException { this.amWebAPI = amWebURI; @@ -422,4 +434,10 @@ public abstract class AbstractProviderService public void rebuildContainerDetails(List liveContainers, String applicationId, Map providerRoles) { } + + @Override + public boolean processContainerStatus(ContainerId containerId, + ContainerStatus status) { + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java index 3f24665..b62510a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.slider.api.ClusterDescription; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.core.conf.AggregateConf; @@ -189,13 +190,9 @@ public interface ProviderService extends ProviderCore, /** * Prior to going live -register the initial service registry data * @param amWebURI URL to the AM. This may be proxied, so use relative paths - * @param agentOpsURI URI for agent operations. This will not be proxied - * @param agentStatusURI URI For agent status. Again: no proxy * @param serviceRecord service record to build up */ void applyInitialRegistryDefinitions(URL amWebURI, - URL agentOpsURI, - URL agentStatusURI, ServiceRecord serviceRecord) throws IOException; @@ -216,4 +213,11 @@ public interface ProviderService extends ProviderCore, */ void rebuildContainerDetails(List liveContainers, String applicationId, Map providerRoles); + + /** + * Process container status + * @return true if status needs to be requested again, false otherwise + */ + boolean processContainerStatus(ContainerId containerId, + ContainerStatus status); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb860960/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java index 07d106b..47556f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java @@ -18,16 +18,29 @@ package org.apache.slider.providers; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.slider.api.ClusterDescription; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.slider.api.ClusterNode; import org.apache.slider.api.InternalKeys; import org.apache.slider.api.OptionKeys; import org.apache.slider.api.ResourceKeys; import org.apache.slider.api.RoleKeys; +import org.apache.slider.common.SliderExitCodes; import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.SliderXmlConfKeys; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.conf.AggregateConf; @@ -35,28 +48,50 @@ import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.exceptions.BadCommandArgumentsException; import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.NoSuchNodeException; import org.apache.slider.core.exceptions.SliderException; -import org.apache.slider.core.exceptions.SliderInternalStateException; +import org.apache.slider.core.launch.ContainerLauncher; +import org.apache.slider.core.registry.docstore.ConfigFormat; +import org.apache.slider.core.registry.docstore.ConfigUtils; +import org.apache.slider.core.registry.docstore.ExportEntry; +import org.apache.slider.core.registry.docstore.PublishedConfiguration; +import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; +import org.apache.slider.core.registry.docstore.PublishedExports; +import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.slider.server.appmaster.state.StateAccessForProviders; +import org.apache.slider.server.services.security.CertificateManager; +import org.apache.slider.server.services.security.SecurityStore; +import org.apache.slider.server.services.security.StoresGenerator; +import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; import org.slf4j.Logger; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.LinkedList; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; import java.util.regex.Pattern; /** - * this is a factoring out of methods handy for providers. It's bonded to a log at - * construction time + * This is a factoring out of methods handy for providers. It's bonded to a log + * at construction time. */ -public class ProviderUtils implements RoleKeys { +public class ProviderUtils implements RoleKeys, SliderKeys { protected final Logger log; /** - * Create an instace + * Create an instance * @param log log directory to use -usually the provider */ @@ -66,14 +101,14 @@ public class ProviderUtils implements RoleKeys { /** * Add oneself to the classpath. This does not work - * on minicluster test runs where the JAR is not built up + * on minicluster test runs where the JAR is not built up. * @param providerResources map of provider resources to add these entries to * @param provider provider to add * @param jarName name of the jar to use * @param sliderFileSystem target filesystem * @param tempPath path in the cluster FS for temp files * @param libdir relative directory to place resources - * @param miniClusterTestRun + * @param miniClusterTestRun true if minicluster is being used * @return true if the class was found in a JAR * * @throws FileNotFoundException if the JAR was not found and this is NOT @@ -81,7 +116,8 @@ public class ProviderUtils implements RoleKeys { * @throws IOException IO problems * @throws SliderException any Slider problem */ - public static boolean addProviderJar(Map providerResources, + public static boolean addProviderJar( + Map providerResources, Object provider, String jarName, SliderFileSystem sliderFileSystem, @@ -108,13 +144,14 @@ public class ProviderUtils implements RoleKeys { } /** - * Add/overwrite the agent tarball (overwritten every time application is restarted) - * @param provider - * @param tarName - * @param sliderFileSystem - * @param agentDir + * Add/overwrite the agent tarball (overwritten every time application is + * restarted). + * @param provider an instance of a provider class + * @param tarName name of the tarball to upload + * @param sliderFileSystem the file system + * @param agentDir directory to upload to * @return true the location could be determined and the file added - * @throws IOException + * @throws IOException if the upload fails */ public static boolean addAgentTar(Object provider, String tarName, @@ -125,100 +162,58 @@ public class ProviderUtils implements RoleKeys { if(localFile != null) { String parentDir = localFile.getParent(); Path agentTarPath = new Path(parentDir, tarName); - sliderFileSystem.getFileSystem().copyFromLocalFile(false, true, agentTarPath, agentDir); + sliderFileSystem.getFileSystem().copyFromLocalFile(false, true, + agentTarPath, agentDir); return true; } return false; } /** - * Add a set of dependencies to the provider resources being built up, - * by copying them from the local classpath to the remote one, then - * registering them - * @param providerResources map of provider resources to add these entries to - * @param sliderFileSystem target filesystem - * @param tempPath path in the cluster FS for temp files - * @param libdir relative directory to place resources - * @param resources list of resource names (e.g. "hbase.jar" - * @param classes list of classes where classes[i] refers to a class in - * resources[i] - * @throws IOException IO problems - * @throws SliderException any Slider problem - */ - public static void addDependencyJars(Map providerResources, - SliderFileSystem sliderFileSystem, - Path tempPath, - String libdir, - String[] resources, - Class[] classes - ) throws - IOException, - SliderException { - if (resources.length != classes.length) { - throw new SliderInternalStateException( - "mismatch in Jar names [%d] and classes [%d]", - resources.length, - classes.length); - } - int size = resources.length; - for (int i = 0; i < size; i++) { - String jarName = resources[i]; - Class clazz = classes[i]; - SliderUtils.putJar(providerResources, - sliderFileSystem, - clazz, - tempPath, - libdir, - jarName); - } - - } - - /** - * Loads all dependency jars from the default path + * Loads all dependency jars from the default path. * @param providerResources map of provider resources to add these entries to * @param sliderFileSystem target filesystem * @param tempPath path in the cluster FS for temp files * @param libDir relative directory to place resources * @param libLocalSrcDir explicitly supplied local libs dir - * @throws IOException - * @throws SliderException - */ - public static void addAllDependencyJars(Map providerResources, - SliderFileSystem sliderFileSystem, - Path tempPath, - String libDir, - String libLocalSrcDir) + * @throws IOException trouble copying to HDFS + * @throws SliderException trouble copying to HDFS + */ + public static void addAllDependencyJars( + Map providerResources, + SliderFileSystem sliderFileSystem, + Path tempPath, + String libDir, + String libLocalSrcDir) throws IOException, SliderException { - String libSrcToUse = libLocalSrcDir; if (SliderUtils.isSet(libLocalSrcDir)) { File file = new File(libLocalSrcDir); if (!file.exists() || !file.isDirectory()) { - throw new BadCommandArgumentsException("Supplied lib src dir %s is not valid", libLocalSrcDir); + throw new BadCommandArgumentsException( + "Supplied lib src dir %s is not valid", libLocalSrcDir); } } - SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, libDir, libSrcToUse); + SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, + libDir, libLocalSrcDir); } + /** - * build the log directory - * @return the log dir + * Validate the requested number of instances of a component. + *

+ * If max <= 0: min <= count + * If max > 0: min <= count <= max + * @param instanceDescription configuration + * @param name node class name + * @param min requested heap size + * @param max maximum value. + * @throws BadCommandArgumentsException if the values are out of range */ - public String getLogdir() throws IOException { - String logdir = System.getenv("LOGDIR"); - if (logdir == null) { - logdir = - SliderKeys.TMP_LOGDIR_PREFIX + UserGroupInformation.getCurrentUser().getShortUserName(); - } - return logdir; - } - - public void validateNodeCount(AggregateConf instanceDescription, - String name, int min, int max) throws - BadCommandArgumentsException { + String name, int min, int max) + throws BadCommandArgumentsException { MapOperations component = - instanceDescription.getResourceOperations().getComponent(name); + instanceDescription.getResourceOperations().getComponent(name); int count; if (component == null) { count = 0; @@ -229,7 +224,7 @@ public class ProviderUtils implements RoleKeys { } /** - * Validate the node count and heap size values of a node class + * Validate the count is between min and max. *

* If max <= 0: min <= count * If max > 0: min <= count <= max @@ -256,33 +251,36 @@ public class ProviderUtils implements RoleKeys { } /** - * copy all options beginning site. into the site.xml - * @param clusterSpec cluster specification - * @param sitexml map for XML file to build up + * Copy options beginning with "site.configName." prefix from options map + * to sitexml map, removing the prefix and substituting the tokens + * specified in the tokenMap. + * @param options source map + * @param sitexml destination map + * @param configName optional ".configName" portion of the prefix + * @param tokenMap key/value pairs to substitute into the option values */ - public void propagateSiteOptions(ClusterDescription clusterSpec, - Map sitexml) { - Map options = clusterSpec.options; - propagateSiteOptions(options, sitexml); - } - - public void propagateSiteOptions(Map options, - Map sitexml) { - propagateSiteOptions(options, sitexml, ""); - } - public void propagateSiteOptions(Map options, - Map sitexml, - String configName) { - propagateSiteOptions(options, sitexml, configName, null); + Map sitexml, + String configName, + Map tokenMap) { + String prefix = OptionKeys.SITE_XML_PREFIX + + (!configName.isEmpty() ? configName + "." : ""); + propagateOptions(options, sitexml, tokenMap, prefix); } - public void propagateSiteOptions(Map options, + /** + * Copy options beginning with prefix from options map + * to sitexml map, removing the prefix and substituting the tokens + * specified in the tokenMap. + * @param options source map + * @param sitexml destination map + * @param tokenMap key/value pairs to substitute into the option values + * @param prefix which options to copy to destination map + */ + public void propagateOptions(Map options, Map sitexml, - String configName, - Map tokenMap) { - String prefix = OptionKeys.SITE_XML_PREFIX + - (!configName.isEmpty() ? configName + "." : ""); + Map tokenMap, + String prefix) { for (Map.Entry entry : options.entrySet()) { String key = entry.getKey(); if (key.startsWith(prefix)) { @@ -302,229 +300,1038 @@ public class ProviderUtils implements RoleKeys { } /** - * Propagate an option from the cluster specification option map - * to the site XML map, using the site key for the name - * @param global global config spec - * @param optionKey key in the option map - * @param sitexml map for XML file to build up - * @param siteKey key to assign the value to in the site XML - * @throws BadConfigException if the option is missing from the cluster spec - */ - public void propagateOption(MapOperations global, - String optionKey, - Map sitexml, - String siteKey) throws BadConfigException { - sitexml.put(siteKey, global.getMandatoryOption(optionKey)); - } - - - /** - * Build the image dir. This path is relative and only valid at the far end - * @param instanceDefinition instance definition - * @param bindir bin subdir - * @param script script in bin subdir - * @return the path to the script - * @throws FileNotFoundException if a file is not found, or it is not a directory* - */ - public String buildPathToHomeDir(AggregateConf instanceDefinition, - String bindir, - String script) throws - FileNotFoundException, - BadConfigException { - MapOperations globalOptions = - instanceDefinition.getInternalOperations().getGlobalOptions(); - String applicationHome = - globalOptions.get(InternalKeys.INTERNAL_APPLICATION_HOME); - String imagePath = - globalOptions.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); - return buildPathToHomeDir(imagePath, applicationHome, bindir, script); - } - - public String buildPathToHomeDir(String imagePath, - String applicationHome, - String bindir, String script) throws - FileNotFoundException { - String path; - File scriptFile; - if (imagePath != null) { - File tarball = new File(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR); - scriptFile = findBinScriptInExpandedArchive(tarball, bindir, script); - // now work back from the script to build the relative path - // to the binary which will be valid remote or local - StringBuilder builder = new StringBuilder(); - builder.append(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR); - builder.append("/"); - //for the script, we want the name of ../.. - File archive = scriptFile.getParentFile().getParentFile(); - builder.append(archive.getName()); - path = builder.toString(); + * Substitute tokens into option map values, returning a new map. + * @param options source map + * @param tokenMap key/value pairs to substitute into the option values + * @return map with substituted values + */ + public Map filterSiteOptions(Map options, + Map tokenMap) { + String prefix = OptionKeys.SITE_XML_PREFIX; + Map filteredOptions = new HashMap<>(); + for (Map.Entry entry : options.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(prefix)) { + String value = entry.getValue(); + if (tokenMap != null) { + for (Map.Entry token : tokenMap.entrySet()) { + value = value.replaceAll(Pattern.quote(token.getKey()), + token.getValue()); + } + } + filteredOptions.put(key, value); + } + } + return filteredOptions; + } + + /** + * Get resource requirements from a String value. If value isn't specified, + * use the default value. If value is greater than max, use the max value. + * @param val string value + * @param defVal default value + * @param maxVal maximum value + * @return int resource requirement + */ + public int getRoleResourceRequirement(String val, + int defVal, + int maxVal) { + if (val==null) { + val = Integer.toString(defVal); + } + Integer intVal; + if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) { + intVal = maxVal; + } else { + intVal = Integer.decode(val); + } + return intVal; + } + + /** + * Localize the service keytabs for the application. + * @param launcher container launcher + * @param instanceDefinition app specification + * @param fileSystem file system + * @param clusterName app name + * @throws IOException trouble uploading to HDFS + */ + public void localizeServiceKeytabs(ContainerLauncher launcher, + AggregateConf instanceDefinition, SliderFileSystem fileSystem, + String clusterName) throws IOException { + ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); + String keytabPathOnHost = appConf.getComponent(COMPONENT_AM).get( + SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); + if (SliderUtils.isUnset(keytabPathOnHost)) { + String amKeytabName = appConf.getComponent(COMPONENT_AM).get( + SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + String keytabDir = appConf.getComponent(COMPONENT_AM).get( + SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR); + // we need to localize the keytab files in the directory + Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null, + clusterName); + boolean serviceKeytabsDeployed = false; + if (fileSystem.getFileSystem().exists(keytabDirPath)) { + FileStatus[] keytabs = fileSystem.getFileSystem().listStatus( + keytabDirPath); + LocalResource keytabRes; + for (FileStatus keytab : keytabs) { + if (!amKeytabName.equals(keytab.getPath().getName()) + && keytab.getPath().getName().endsWith(".keytab")) { + serviceKeytabsDeployed = true; + log.info("Localizing keytab {}", keytab.getPath().getName()); + keytabRes = fileSystem.createAmResource(keytab.getPath(), + LocalResourceType.FILE); + launcher.addLocalResource(KEYTAB_DIR + "/" + + keytab.getPath().getName(), + keytabRes); + } + } + } + if (!serviceKeytabsDeployed) { + log.warn("No service keytabs for the application have been localized. " + + "If the application requires keytabs for secure operation, " + + "please ensure that the required keytabs have been uploaded " + + "to the folder {}", keytabDirPath); + } + } + } + + /** + * Return whether two-way SSL is enabled for Agent / AM communication. + * @param amComponent component specification + * @return true if enabled + */ + public boolean hasTwoWaySSLEnabled(MapOperations amComponent) { + return amComponent != null ? + amComponent.getOptionBool(TWO_WAY_SSL_ENABLED, false) : false; + } + + /** + * Generate and localize SSL certs for Agent / AM communication + * @param launcher container launcher + * @param container allocated container information + * @param fileSystem file system + * @param clusterName app name + * @throws SliderException certs cannot be generated/uploaded + */ + public void localizeContainerSSLResources(ContainerLauncher launcher, + Container container, SliderFileSystem fileSystem, String clusterName) + throws SliderException { + try { + // localize server cert + Path certsDir = fileSystem.buildClusterSecurityDirPath(clusterName); + LocalResource certResource = fileSystem.createAmResource( + new Path(certsDir, CRT_FILE_NAME), + LocalResourceType.FILE); + launcher.addLocalResource(CERT_FILE_LOCALIZATION_PATH, certResource); + + // generate and localize agent cert + CertificateManager certMgr = new CertificateManager(); + String hostname = container.getNodeId().getHost(); + String containerId = container.getId().toString(); + certMgr.generateContainerCertificate(hostname, containerId); + LocalResource agentCertResource = fileSystem.createAmResource( + uploadSecurityResource( + CertificateManager.getAgentCertficateFilePath(containerId), + fileSystem, clusterName), LocalResourceType.FILE); + // still using hostname as file name on the agent side, but the files + // do end up under the specific container's file space + launcher.addLocalResource(INFRA_RUN_SECURITY_DIR + hostname + + ".crt", agentCertResource); + LocalResource agentKeyResource = fileSystem.createAmResource( + uploadSecurityResource( + CertificateManager.getAgentKeyFilePath(containerId), fileSystem, + clusterName), + LocalResourceType.FILE); + launcher.addLocalResource(INFRA_RUN_SECURITY_DIR + hostname + + ".key", agentKeyResource); + + } catch (Exception e) { + throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e, + "Unable to localize certificates. Two-way SSL cannot be enabled"); + } + } + + /** + * Upload a local file to the cluster security dir in HDFS. If the file + * already exists, it is not replaced. + * @param resource file to upload + * @param fileSystem file system + * @param clusterName app name + * @return Path of the uploaded file + * @throws IOException file cannot be uploaded + */ + private Path uploadSecurityResource(File resource, + SliderFileSystem fileSystem, String clusterName) throws IOException { + Path certsDir = fileSystem.buildClusterSecurityDirPath(clusterName); + return uploadResource(resource, fileSystem, certsDir); + } + + /** + * Upload a local file to the cluster resources dir in HDFS. If the file + * already exists, it is not replaced. + * @param resource file to upload + * @param fileSystem file system + * @param roleName optional subdirectory (for component-specific resources) + * @param clusterName app name + * @return Path of the uploaded file + * @throws IOException file cannot be uploaded + */ + private Path uploadResource(File resource, SliderFileSystem fileSystem, + String roleName, String clusterName) throws IOException { + Path dir; + if (roleName == null) { + dir = fileSystem.buildClusterResourcePath(clusterName); + } else { + dir = fileSystem.buildClusterResourcePath(clusterName, roleName); + } + return uploadResource(resource, fileSystem, dir); + } + /** + * Upload a local file to a specified HDFS directory. If the file already + * exists, it is not replaced. + * @param resource file to upload + * @param fileSystem file system + * @param parentDir destination directory in HDFS + * @return Path of the uploaded file + * @throws IOException file cannot be uploaded + */ + private synchronized Path uploadResource(File resource, + SliderFileSystem fileSystem, Path parentDir) throws IOException { + if (!fileSystem.getFileSystem().exists(parentDir)) { + fileSystem.getFileSystem().mkdirs(parentDir, + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); + } + Path destPath = new Path(parentDir, resource.getName()); + if (!fileSystem.getFileSystem().exists(destPath)) { + FSDataOutputStream os = null; + try { + os = fileSystem.getFileSystem().create(destPath); + byte[] contents = FileUtils.readFileToByteArray(resource); + os.write(contents, 0, contents.length); + os.flush(); + } finally { + IOUtils.closeStream(os); + } + log.info("Uploaded {} to localization path {}", resource, destPath); } else { - // using a home directory which is required to be present on - // the local system -so will be absolute and resolvable - File homedir = new File(applicationHome); - path = homedir.getAbsolutePath(); + log.info("Resource {} already existed at localization path {}", resource, + destPath); + } - //this is absolute, resolve its entire path - SliderUtils.verifyIsDir(homedir, log); - File bin = new File(homedir, bindir); - SliderUtils.verifyIsDir(bin, log); - scriptFile = new File(bin, script); - SliderUtils.verifyFileExists(scriptFile, log); + while (!fileSystem.getFileSystem().exists(destPath)) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } } - return path; + + fileSystem.getFileSystem().setPermission(destPath, + new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE)); + + return destPath; } - /** - * Build the image dir. This path is relative and only valid at the far end - * @param instance instance options - * @param bindir bin subdir - * @param script script in bin subdir - * @return the path to the script - * @throws FileNotFoundException if a file is not found, or it is not a directory* - */ - public String buildPathToScript(AggregateConf instance, - String bindir, - String script) throws FileNotFoundException { - return buildPathToScript(instance.getInternalOperations(), bindir, script); - } - /** - * Build the image dir. This path is relative and only valid at the far end - * @param internal internal options - * @param bindir bin subdir - * @param script script in bin subdir - * @return the path to the script - * @throws FileNotFoundException if a file is not found, or it is not a directory* - */ - public String buildPathToScript(ConfTreeOperations internal, - String bindir, - String script) throws FileNotFoundException { - - String homedir = buildPathToHomeDir( - internal.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH), - internal.get(InternalKeys.INTERNAL_APPLICATION_HOME), - bindir, - script); - return buildScriptPath(bindir, script, homedir); + * Write a configuration property map to a local file in a specified format. + * @param fileSystem file system + * @param file destination file + * @param configFormat file format + * @param configFileDN file description + * @param config properties to save to the file + * @param clusterName app name + * @throws IOException file cannot be created + */ + private void createConfigFile(SliderFileSystem fileSystem, File file, + ConfigFormat configFormat, String configFileDN, + Map config, String clusterName) throws IOException { + log.info("Writing {} file {}", configFormat, file); + + ConfigUtils.prepConfigForTemplateOutputter(configFormat, config, + fileSystem, clusterName, file.getName()); + PublishedConfiguration publishedConfiguration = + new PublishedConfiguration(configFileDN, + config.entrySet()); + PublishedConfigurationOutputter configurationOutputter = + PublishedConfigurationOutputter.createOutputter(configFormat, + publishedConfiguration); + configurationOutputter.save(file); } - - - public String buildScriptPath(String bindir, String script, String homedir) { - StringBuilder builder = new StringBuilder(homedir); - builder.append("/"); - builder.append(bindir); - builder.append("/"); - builder.append(script); - return builder.toString(); + /** + * Determine config files requested in the appConf, generate the files, and + * localize them. + * @param launcher container launcher + * @param roleName component name + * @param roleGroup component group + * @param appConf app configurations + * @param configs configurations grouped by config name + * @param env environment variables + * @param fileSystem file system + * @param clusterName app name + * @throws IOException file(s) cannot be uploaded + * @throws BadConfigException file name not specified or file format not + * supported + */ + public void localizeConfigFiles(ContainerLauncher launcher, + String roleName, String roleGroup, + ConfTreeOperations appConf, + Map> configs, + MapOperations env, + SliderFileSystem fileSystem, + String clusterName) + throws IOException, BadConfigException { + for (Entry> configEntry : configs.entrySet()) { + String configFileName = appConf.getComponentOpt(roleGroup, + OptionKeys.CONF_FILE_PREFIX + configEntry.getKey() + OptionKeys + .NAME_SUFFIX, null); + String configFileType = appConf.getComponentOpt(roleGroup, + OptionKeys.CONF_FILE_PREFIX + configEntry.getKey() + OptionKeys + .TYPE_SUFFIX, null); + if (configFileName == null && configFileType == null) { + // config file not requested, so continue + continue; + } + if (configFileName == null) { + throw new BadConfigException("Config file name null for " + + configEntry.getKey()); + } + if (configFileType == null) { + throw new BadConfigException("Config file type null for " + + configEntry.getKey()); + } + ConfigFormat configFormat = ConfigFormat.resolve(configFileType); + if (configFormat == null) { + throw new BadConfigException("Config format " + configFormat + + " doesn't exist"); + } + localizeConfigFile(launcher, roleName, roleGroup, configEntry.getKey(), + configFormat, configFileName, configs, env, fileSystem, clusterName); + } } + /** + * Create and localize a config file. + * @param launcher container launcher + * @param roleName component name + * @param roleGroup component group + * @param configFileDN config description/name + * @param configFormat config format + * @param configFileName config file name + * @param configs configs grouped by config description/name + * @param env environment variables + * @param fileSystem file system + * @param clusterName app name + * @throws IOException file cannot be uploaded + */ + public void localizeConfigFile(ContainerLauncher launcher, + String roleName, String roleGroup, + String configFileDN, ConfigFormat configFormat, String configFileName, + Map> configs, + MapOperations env, + SliderFileSystem fileSystem, + String clusterName) + throws IOException { + if (launcher == null) { + return; + } + Map config = ConfigUtils.replacePropsInConfig( + configs.get(configFileDN), env.options); + String fileName = ConfigUtils.replaceProps(config, configFileName); + File localFile = new File(RESOURCE_DIR); + if (!localFile.exists()) { + if (!localFile.mkdir()) { + throw new IOException(RESOURCE_DIR + " could not be created!"); + } + } + localFile = new File(localFile, new File(fileName).getName()); - public static String convertToAppRelativePath(File file) { - return convertToAppRelativePath(file.getPath()); + String folder = null; + if ("true".equals(config.get(PER_COMPONENT))) { + folder = roleName; + } else if ("true".equals(config.get(PER_GROUP))) { + folder = roleGroup; + } + + log.info("Localizing {} configs to config file {} (destination {}) " + + "based on {} configs", config.size(), localFile, fileName, + configFileDN); + createConfigFile(fileSystem, localFile, configFormat, configFileDN, config, + clusterName); + Path destPath = uploadResource(localFile, fileSystem, folder, clusterName); + LocalResource configResource = fileSystem.createAmResource(destPath, + LocalResourceType.FILE); + + File destFile = new File(fileName); + if (destFile.isAbsolute()) { + launcher.addLocalResource( + RESOURCE_DIR + "/" + destFile.getName(), + configResource, fileName); + } else { + launcher.addLocalResource(APP_CONF_DIR + "/" + fileName, + configResource); + } } - public static String convertToAppRelativePath(String path) { - return ApplicationConstants.Environment.PWD.$() + "/" + path; + /** + * Generate and localize security stores requested by the app. Also perform + * last-minute substitution of cluster name into credentials strings. + * @param launcher container launcher + * @param container allocated container information + * @param role component name + * @param fileSystem file system + * @param instanceDefinition app specification + * @param compOps component specification + * @param clusterName app name + * @throws SliderException stores cannot be generated/uploaded + * @throws IOException stores cannot be generated/uploaded + */ + public void localizeContainerSecurityStores(ContainerLauncher launcher, + Container container, + String role, + SliderFileSystem fileSystem, + AggregateConf instanceDefinition, + MapOperations compOps, + String clusterName) + throws SliderException, IOException { + // substitute CLUSTER_NAME into credentials + Map> newcred = new HashMap<>(); + for (Entry> entry : + instanceDefinition.getAppConf().credentials.entrySet()) { + List resultList = new ArrayList<>(); + for (String v : entry.getValue()) { + resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"), + clusterName).replaceAll(Pattern.quote("${CLUSTER}"), + clusterName)); + } + newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"), + clusterName).replaceAll(Pattern.quote("${CLUSTER}"), + clusterName), + resultList); + } + instanceDefinition.getAppConf().credentials = newcred; + + // generate and localize security stores + SecurityStore[] stores = generateSecurityStores(container, role, + instanceDefinition, compOps); + for (SecurityStore store : stores) { + LocalResource keystoreResource = fileSystem.createAmResource( + uploadSecurityResource(store.getFile(), fileSystem, clusterName), + LocalResourceType.FILE); + launcher.addLocalResource(String.format("secstores/%s-%s.p12", + store.getType(), role), + keystoreResource); + } + } + + /** + * Generate security stores requested by the app. + * @param container allocated container information + * @param role component name + * @param instanceDefinition app specification + * @param compOps component specification + * @return security stores + * @throws SliderException stores cannot be generated + * @throws IOException stores cannot be generated + */ + private SecurityStore[] generateSecurityStores(Container container, + String role, + AggregateConf instanceDefinition, + MapOperations compOps) + throws SliderException, IOException { + return StoresGenerator.generateSecurityStores( + container.getNodeId().getHost(), container.getId().toString(), + role, instanceDefinition, compOps); } + /** + * Return whether security stores are requested by the app. + * @param compOps component specification + * @return true if stores are requested + */ + public boolean areStoresRequested(MapOperations compOps) { + return compOps != null ? compOps. + getOptionBool(COMP_STORES_REQUIRED_KEY, false) : false; + } - public static void validatePathReferencesLocalDir(String meaning, String path) - throws BadConfigException { - File file = new File(path); - if (!file.exists()) { - throw new BadConfigException("%s directory %s not found", meaning, file); + /** + * Localize application tarballs and other resources requested by the app. + * @param launcher container launcher + * @param fileSystem file system + * @param appConf app configurations + * @param roleGroup component group + * @param clusterName app name + * @throws IOException resources cannot be uploaded + * @throws BadConfigException package name or type is not specified + */ + public void localizePackages(ContainerLauncher launcher, + SliderFileSystem fileSystem, ConfTreeOperations appConf, String roleGroup, + String clusterName) throws IOException, BadConfigException { + for (Entry> pkg : + getPackages(roleGroup, appConf).entrySet()) { + String pkgName = pkg.getValue().get(OptionKeys.NAME_SUFFIX); + String pkgType = pkg.getValue().get(OptionKeys.TYPE_SUFFIX); + Path pkgPath = fileSystem.buildResourcePath(pkgName); + if (!fileSystem.isFile(pkgPath)) { + pkgPath = fileSystem.buildResourcePath(clusterName, + pkgName); + } + if (!fileSystem.isFile(pkgPath)) { + throw new IOException("Package doesn't exist as a resource: " + + pkgName); + } + log.info("Adding resource {}", pkgName); + LocalResourceType type = LocalResourceType.FILE; + if ("archive".equals(pkgType)) { + type = LocalResourceType.ARCHIVE; + } + LocalResource packageResource = fileSystem.createAmResource( + pkgPath, type); + launcher.addLocalResource(APP_PACKAGES_DIR, packageResource); } - if (!file.isDirectory()) { - throw new BadConfigException("%s is not a directory: %s", meaning, file); + } + + /** + * Build a map of configuration description/name to configuration key/value + * properties, with all known tokens substituted into the property values. + * @param appConf app configurations + * @param internalsConf internal configurations + * @param containerId container ID + * @param roleName component name + * @param roleGroup component group + * @param amState access to AM state + * @return configuration properties grouped by config description/name + */ + public Map> buildConfigurations( + ConfTreeOperations appConf, ConfTreeOperations internalsConf, + String containerId, String roleName, String roleGroup, + StateAccessForProviders amState) { + + Map> configurations = new TreeMap<>(); + Map tokens = getStandardTokenMap(appConf, + internalsConf, roleName, roleGroup, containerId); + + Set configs = new HashSet<>(); + configs.addAll(getApplicationConfigurationTypes(roleGroup, appConf)); + configs.addAll(getSystemConfigurationsRequested(appConf)); + + for (String configType : configs) { + addNamedConfiguration(configType, appConf.getGlobalOptions().options, + configurations, tokens, amState); + if (appConf.getComponent(roleGroup) != null) { + addNamedConfiguration(configType, + appConf.getComponent(roleGroup).options, configurations, tokens, + amState); + } } + + //do a final replacement of re-used configs + dereferenceAllConfigs(configurations); + + return configurations; } /** - * get the user name - * @return the user name + * Substitute "site." prefixed configuration values into other configuration + * values where needed. The format for these substitutions is that + * {@literal ${@//site/configDN/key}} will be replaced by the value for the + * "site.configDN.key" property. + * @param configurations configuration properties grouped by config + * description/name */ - public String getUserName() throws IOException { - return UserGroupInformation.getCurrentUser().getShortUserName(); + public void dereferenceAllConfigs( + Map> configurations) { + Map allConfigs = new HashMap<>(); + String lookupFormat = "${@//site/%s/%s}"; + for (String configType : configurations.keySet()) { + Map configBucket = configurations.get(configType); + for (String configName : configBucket.keySet()) { + allConfigs.put(String.format(lookupFormat, configType, configName), + configBucket.get(configName)); + } + } + + boolean finished = false; + while (!finished) { + finished = true; + for (Map.Entry entry : allConfigs.entrySet()) { + String configValue = entry.getValue(); + for (Map.Entry lookUpEntry : allConfigs.entrySet()) { + String lookUpValue = lookUpEntry.getValue(); + if (lookUpValue.contains("${@//site/")) { + continue; + } + String lookUpKey = lookUpEntry.getKey(); + if (configValue != null && configValue.contains(lookUpKey)) { + configValue = configValue.replace(lookUpKey, lookUpValue); + } + } + if (!configValue.equals(entry.getValue())) { + finished = false; + allConfigs.put(entry.getKey(), configValue); + } + } + } + + for (String configType : configurations.keySet()) { + Map configBucket = configurations.get(configType); + for (Map.Entry entry: configBucket.entrySet()) { + String configName = entry.getKey(); + String configValue = entry.getValue(); + for (Map.Entry lookUpEntry : allConfigs.entrySet()) { + String lookUpValue = lookUpEntry.getValue(); + if (lookUpValue.contains("${@//site/")) { + continue; + } + String lookUpKey = lookUpEntry.getKey(); + if (configValue != null && configValue.contains(lookUpKey)) { + configValue = configValue.replace(lookUpKey, lookUpValue); + } + } + configBucket.put(configName, configValue); + } + } } /** - * Find a script in an expanded archive - * @param base base directory - * @param bindir bin subdir - * @param script script in bin subdir - * @return the path to the script - * @throws FileNotFoundException if a file is not found, or it is not a directory + * Return a set of configuration description/names represented in the app. + * configuration + * @param roleGroup component group + * @param appConf app configurations + * @return set of configuration description/names */ - public File findBinScriptInExpandedArchive(File base, - String bindir, - String script) - throws FileNotFoundException { - - SliderUtils.verifyIsDir(base, log); - File[] ls = base.listFiles(); - if (ls == null) { - //here for the IDE to be happy, as the previous check will pick this case - throw new FileNotFoundException("Failed to list directory " + base); + public Set getApplicationConfigurationTypes(String roleGroup, + ConfTreeOperations appConf) { + Set configList = new HashSet<>(); + + String prefix = OptionKeys.CONF_FILE_PREFIX; + String suffix = OptionKeys.TYPE_SUFFIX; + MapOperations component = appConf.getComponent(roleGroup); + if (component != null) { + addConfsToList(component, configList, prefix, suffix); } + addConfsToList(appConf.getGlobalOptions(), configList, prefix, suffix); - log.debug("Found {} entries in {}", ls.length, base); - List directories = new LinkedList(); - StringBuilder dirs = new StringBuilder(); - for (File file : ls) { - log.debug("{}", false); - if (file.isDirectory()) { - directories.add(file); - dirs.append(file.getPath()).append(" "); + return configList; + } + + /** + * Finds all configuration description/names of the form + * prefixconfigDNsuffix in the configuration (e.g. conf.configDN.type). + * @param confMap configuration properties + * @param confList set containing configuration description/names + * @param prefix configuration key prefix to match + * @param suffix configuration key suffix to match + */ + private void addConfsToList(Map confMap, + Set confList, String prefix, String suffix) { + for (String key : confMap.keySet()) { + if (key.startsWith(prefix) && key.endsWith(suffix)) { + String confName = key.substring(prefix.length(), + key.length() - suffix.length()); + if (!confName.isEmpty()) { + confList.add(confName); + } } } - if (directories.size() > 1) { - throw new FileNotFoundException( - "Too many directories in archive to identify binary: " + dirs); + } + + /** + * Build a map of package description/name to package key/value properties + * (there should be two properties, type and name). + * @param roleGroup component group + * @param appConf app configurations + * @return map of package description/name to package key/value properties + * @throws BadConfigException package name or type is not specified + */ + public Map> getPackages(String roleGroup, + ConfTreeOperations appConf) throws BadConfigException { + Map> packages = new HashMap<>(); + String prefix = OptionKeys.PKG_FILE_PREFIX; + String typeSuffix = OptionKeys.TYPE_SUFFIX; + String nameSuffix = OptionKeys.NAME_SUFFIX; + MapOperations component = appConf.getComponent(roleGroup); + if (component == null) { + component = appConf.getGlobalOptions(); + } + for (Map.Entry entry : component.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(prefix)) { + String confName; + String type; + if (key.endsWith(typeSuffix)) { + confName = key.substring(prefix.length(), key.length() - typeSuffix.length()); + type = typeSuffix; + } else if (key.endsWith(nameSuffix)) { + confName = key.substring(prefix.length(), key.length() - nameSuffix.length()); + type = nameSuffix; + } else { + continue; + } + if (!packages.containsKey(confName)) { + packages.put(confName, new HashMap()); + } + packages.get(confName).put(type, entry.getValue()); + } } - if (directories.isEmpty()) { - throw new FileNotFoundException( - "No directory found in archive " + base); + + for (Entry> pkg : packages.entrySet()) { + if (!pkg.getValue().containsKey(OptionKeys.TYPE_SUFFIX)) { + throw new BadConfigException("Package " + pkg.getKey() + " doesn't " + + "have a package type"); + } + if (!pkg.getValue().containsKey(OptionKeys.NAME_SUFFIX)) { + throw new BadConfigException("Package " + pkg.getKey() + " doesn't " + + "have a package name"); + } } - File archive = directories.get(0); - File bin = new File(archive, bindir); - SliderUtils.verifyIsDir(bin, log); - File scriptFile = new File(bin, script); - SliderUtils.verifyFileExists(scriptFile, log); - return scriptFile; + + return packages; } /** - * Return any additional arguments (argv) to provide when starting this role - * - * @param roleOptions - * The options for this role - * @return A non-null String which contains command line arguments for this role, or the empty string. + * Return system configurations requested by the app. + * @param appConf app configurations + * @return set of system configurations */ - public static String getAdditionalArgs(Map roleOptions) { - if (roleOptions.containsKey(RoleKeys.ROLE_ADDITIONAL_ARGS)) { - String additionalArgs = roleOptions.get(RoleKeys.ROLE_ADDITIONAL_ARGS); - if (null != additionalArgs) { - return additionalArgs; + public Set getSystemConfigurationsRequested( + ConfTreeOperations appConf) { + Set configList = new HashSet<>(); + + String configTypes = appConf.get(SYSTEM_CONFIGS); + if (configTypes != null && configTypes.length() > 0) { + String[] configs = configTypes.split(","); + for (String config : configs) { + configList.add(config.trim()); } } - return ""; + return configList; } - - public int getRoleResourceRequirement(String val, - int defVal, - int maxVal) { - if (val==null) { - val = Integer.toString(defVal); + + /** + * For a given config description/name, pull out its site configs from the + * source config map, remove the site.configDN. prefix from them, and place + * them into a new config map using the {@link #propagateSiteOptions} method + * (with tokens substituted). This new k/v map is put as the value for the + * configDN key in the configurations map. + * @param configName config description/name + * @param sourceConfig config containing site.* properties + * @param configurations configuration map to be populated + * @param tokens initial substitution tokens + * @param amState access to AM state + */ + private void addNamedConfiguration(String configName, + Map sourceConfig, + Map> configurations, + Map tokens, StateAccessForProviders amState) { + Map config = new HashMap<>(); + if (configName.equals(GLOBAL_CONFIG_TAG)) { + addDefaultGlobalConfig(config); } - Integer intVal; - if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) { - intVal = maxVal; + // add role hosts to tokens + addRoleRelatedTokens(tokens, amState); + propagateSiteOptions(sourceConfig, config, configName, tokens); + + configurations.put(configName, config); + } + + /** + * Get initial token map to be substituted into config values. + * @param appConf app configurations + * @param internals internal configurations + * @param componentName component name + * @param componentGroup component group + * @param clusterName app name + * @return tokens to replace + */ + public Map getStandardTokenMap(ConfTreeOperations appConf, + ConfTreeOperations internals, String componentName, + String componentGroup, String clusterName) { + return getStandardTokenMap(appConf, internals, componentName, + componentGroup, null, clusterName); + } + + /** + * Get initial token map to be substituted into config values. + * @param appConf app configurations + * @param internals internal configurations + * @param componentName component name + * @param componentGroup component group + * @param containerId container ID + * @param clusterName app name + * @return tokens to replace + */ + public Map getStandardTokenMap(ConfTreeOperations appConf, + ConfTreeOperations internals, String componentName, + String componentGroup, String containerId, String clusterName) { + + Map tokens = new HashMap<>(); + if (containerId != null) { + tokens.put("${CONTAINER_ID}", containerId); + } + String nnuri = appConf.get("site.fs.defaultFS"); + tokens.put("${NN_URI}", nnuri); + tokens.put("${NN_HOST}", URI.create(nnuri).getHost()); + tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS)); + tokens.put("${DEFAULT_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH)); + String prefix = appConf.getComponentOpt(componentGroup, ROLE_PREFIX, + null); + String dataDirSuffix = ""; + if (prefix == null) { + prefix = ""; } else { - intVal = Integer.decode(val); + dataDirSuffix = "_" + SliderUtils.trimPrefix(prefix); } - return intVal; + tokens.put("${DEFAULT_DATA_DIR}", internals.getGlobalOptions() + .getOption(InternalKeys.INTERNAL_DATA_DIR_PATH, null) + dataDirSuffix); + tokens.put("${JAVA_HOME}", appConf.get(JAVA_HOME)); + tokens.put("${COMPONENT_NAME}", componentName); + tokens.put("${COMPONENT_NAME.lc}", componentName.toLowerCase()); + tokens.put("${COMPONENT_PREFIX}", prefix); + tokens.put("${COMPONENT_PREFIX.lc}", prefix.toLowerCase()); + if (!componentName.equals(componentGroup) && + componentName.startsWith(componentGroup)) { + tokens.put("${COMPONENT_ID}", + componentName.substring(componentGroup.length())); + } + if (clusterName != null) { + tokens.put("${CLUSTER_NAME}", clusterName); + tokens.put("${CLUSTER_NAME.lc}", clusterName.toLowerCase()); + tokens.put("${APP_NAME}", clusterName); + tokens.put("${APP_NAME.lc}", clusterName.toLowerCase()); + } + tokens.put("${APP_COMPONENT_NAME}", componentName); + tokens.put("${APP_COMPONENT_NAME.lc}", componentName.toLowerCase()); + return tokens; + } + + /** + * Add ROLE_HOST tokens for substitution into config values. + * @param tokens existing tokens + * @param amState access to AM state + */ + public void addRoleRelatedTokens(Map tokens, + StateAccessForProviders amState) { + if (amState == null) { + return; + } + for (Map.Entry> entry : + amState.getRoleClusterNodeMapping().entrySet()) { + String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST"; + String hosts = StringUtils .join(",", + getHostsList(entry.getValue().values(), true)); + tokens.put("${" + tokenName + "}", hosts); + } + } + + /** + * Add global configuration properties. + * @param config map where default global properties will be added + */ + private void addDefaultGlobalConfig(Map config) { + config.put("app_log_dir", "${LOG_DIR}"); + config.put("app_pid_dir", "${WORK_DIR}/app/run"); + config.put("app_install_dir", "${WORK_DIR}/app/install"); + config.put("app_conf_dir", "${WORK_DIR}/" + APP_CONF_DIR); + config.put("app_input_conf_dir", "${WORK_DIR}/" + PROPAGATED_CONF_DIR_NAME); + + // add optional parameters only if they are not already provided + if (!config.containsKey("pid_file")) { + config.put("pid_file", "${WORK_DIR}/app/run/component.pid"); + } + if (!config.containsKey("app_root")) { + config.put("app_root", "${WORK_DIR}/app/install"); + } + } + + /** + * Return a list of hosts based on current ClusterNodes. + * @param values cluster nodes + * @param hostOnly whether host or host/server name will be added to list + * @return list of hosts + */ + public Iterable getHostsList(Collection values, + boolean hostOnly) { + List hosts = new ArrayList<>(); + for (ClusterNode cn : values) { + hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name); + } + return hosts; + } + + /** + * Update ServiceRecord in Registry with IP and hostname. + * @param amState access to AM state + * @param yarnRegistry acces to YARN registry + * @param containerId container ID + * @param roleName component name + * @param ip list of IPs + * @param hostname hostname + */ + public void updateServiceRecord(StateAccessForProviders amState, + YarnRegistryViewForProviders yarnRegistry, + String containerId, String roleName, List ip, String hostname) { + try { + RoleInstance role = null; + if(ip != null && !ip.isEmpty()){ + role = amState.getOwnedContainer(containerId); + role.ip = ip.get(0); + } + if(hostname != null && !hostname.isEmpty()){ + role = amState.getOwnedContainer(containerId); + role.hostname = hostname; + } + if (role != null) { + // create and publish updated service record (including hostname & ip) + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, containerId); + record.description = roleName.replaceAll("_", "-"); + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.CONTAINER); + // TODO: use constants from YarnRegistryAttributes + if (role.ip != null) { + record.set("yarn:ip", role.ip); + } + if (role.hostname != null) { + record.set("yarn:hostname", role.hostname); + } + yarnRegistry.putComponent( + RegistryPathUtils.encodeYarnID(containerId), record); + } + } catch (NoSuchNodeException e) { + // ignore - there is nothing to do if we don't find a container + log.warn("Owned container {} not found - {}", containerId, e); + } catch (IOException e) { + log.warn("Error updating container {} service record in registry", + containerId, e); + } + } + + /** + * Publish a named property bag that may contain name-value pairs for app + * configurations such as hbase-site. + * @param name config file identifying name + * @param description config file description + * @param entries config file properties + * @param amState access to AM state + */ + public void publishApplicationInstanceData(String name, String description, + Iterable> entries, + StateAccessForProviders amState) { + PublishedConfiguration pubconf = new PublishedConfiguration(description, + entries); + log.info("publishing {}", pubconf); + amState.getPublishedSliderConfigurations().put(name, pubconf); + } + + /** + * Publish an export group. + * @param exportGroup export groups + * @param amState access to AM state + * @param roleGroup component group + */ + public void publishExportGroup(Map> exportGroup, + StateAccessForProviders amState, String roleGroup) { + // Publish in old format for the time being + Map simpleEntries = new HashMap<>(); + for (Entry> entry : exportGroup.entrySet()) { + List exports = entry.getValue(); + if (SliderUtils.isNotEmpty(exports)) { + // there is no support for multiple exports per name, so extract only + // the first one + simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue()); + } + } + publishApplicationInstanceData(roleGroup, roleGroup, + simpleEntries.entrySet(), amState); + + PublishedExports exports = new PublishedExports(roleGroup); + exports.setUpdated(new Date().getTime()); + exports.putValues(exportGroup.entrySet()); + amState.getPublishedExportsSet().put(roleGroup, exports); + } + + public Map getExports(ConfTreeOperations appConf, + String roleGroup) { + Map exports = new HashMap<>(); + propagateOptions(appConf.getComponent(roleGroup).options, exports, + null, OptionKeys.EXPORT_PREFIX); + return exports; + } + + private static final String COMPONENT_TAG = "component"; + private static final String HOST_FOLDER_FORMAT = "%s:%s"; + private static final String CONTAINER_LOGS_TAG = "container_log_dirs"; + private static final String CONTAINER_PWDS_TAG = "container_work_dirs"; + + /** + * Format the folder locations and publish in the registry service. + * @param folders folder information + * @param containerId container ID + * @param hostFqdn host FQDN + * @param componentName component name + */ + public void publishFolderPaths(Map folders, + String containerId, String componentName, String hostFqdn, + StateAccessForProviders amState, + Map logFolderExports, + Map workFolderExports) { + Date now = new Date(); + for (Map.Entry entry : folders.entrySet()) { + ExportEntry exportEntry = new ExportEntry(); + exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, + entry.getValue())); + exportEntry.setContainerId(containerId); + exportEntry.setLevel(COMPONENT_TAG); + exportEntry.setTag(componentName); + exportEntry.setUpdatedTime(now.toString()); + if (entry.getKey().equals("AGENT_LOG_ROOT") || + entry.getKey().equals("LOG_DIR")) { + synchronized (logFolderExports) { + logFolderExports.put(containerId, exportEntry); + } + } else { + synchronized (workFolderExports) { + workFolderExports.put(containerId, exportEntry); + } + } + log.info("Updating log and pwd folders for container {}", containerId); + } + + PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG); + exports.setUpdated(now.getTime()); + synchronized (logFolderExports) { + updateExportsFromList(exports, logFolderExports); + } + amState.getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports); + + exports = new PublishedExports(CONTAINER_PWDS_TAG); + exports.setUpdated(now.getTime()); + synchronized (workFolderExports) { + updateExportsFromList(exports, workFolderExports); + } + amState.getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports); + } + + /** + * Update the export data from the map. + * @param exports published exports + * @param folderExports folder exports + */ + private void updateExportsFromList(PublishedExports exports, + Map folderExports) { + Map> perComponentList = new HashMap<>(); + for(Map.Entry logEntry : folderExports.entrySet()) { + String componentName = logEntry.getValue().getTag(); + if (!perComponentList.containsKey(componentName)) { + perComponentList.put(componentName, new ArrayList()); + } + perComponentList.get(componentName).add(logEntry.getValue()); + } + exports.putValues(perComponentList.entrySet()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org