Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 44A9C200BAC for ; Tue, 11 Oct 2016 22:37:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 43957160AF3; Tue, 11 Oct 2016 20:37:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 686A6160AE6 for ; Tue, 11 Oct 2016 22:37:07 +0200 (CEST) Received: (qmail 36944 invoked by uid 500); 11 Oct 2016 20:37:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 36925 invoked by uid 99); 11 Oct 2016 20:37:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2016 20:37:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0641CE049D; Tue, 11 Oct 2016 20:37:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 11 Oct 2016 20:37:06 -0000 Message-Id: In-Reply-To: <0d5aa560fa2a4e0b9ebfc43a0d700608@git.apache.org> References: <0d5aa560fa2a4e0b9ebfc43a0d700608@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/50] [abbrv] hadoop git commit: YARN-5505. Create an agent-less docker provider in the native-services framework. Contributed by Billie Rinaldi archived-at: Tue, 11 Oct 2016 20:37:10 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ba59bf9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java index 8203cf0..fdc5be1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java @@ -82,6 +82,8 @@ import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import static org.apache.slider.common.tools.SliderUtils.getApplicationDefinitionPath; + /** This class implements the client-side aspects of the agent deployer */ public class AgentClientProvider extends AbstractClientProvider implements AgentKeys, SliderKeys { @@ -132,13 +134,13 @@ public class AgentClientProvider extends AbstractClientProvider sliderFileSystem.verifyFileExists(appDefPath); String agentConf = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); + getGlobalOptions().getOption(AGENT_CONF, ""); if (StringUtils.isNotEmpty(agentConf)) { sliderFileSystem.verifyFileExists(new Path(agentConf)); } String appHome = instanceDefinition.getAppConfOperations(). - getGlobalOptions().get(AgentKeys.PACKAGE_PATH); + getGlobalOptions().get(PACKAGE_PATH); if (SliderUtils.isUnset(appHome)) { String agentImage = instanceDefinition.getInternalOperations(). get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); @@ -173,7 +175,7 @@ public class AgentClientProvider extends AbstractClientProvider } Set names = resources.getComponentNames(); - names.remove(SliderKeys.COMPONENT_AM); + names.remove(COMPONENT_AM); Map priorityMap = new HashMap(); for (String name : names) { @@ -271,7 +273,7 @@ public class AgentClientProvider extends AbstractClientProvider String agentImage = instanceDefinition.getInternalOperations(). get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); if (SliderUtils.isUnset(agentImage)) { - Path agentPath = new Path(tempPath.getParent(), AgentKeys.PROVIDER_AGENT); + Path agentPath = new Path(tempPath.getParent(), PROVIDER_AGENT); log.info("Automatically uploading the agent tarball at {}", agentPath); fileSystem.getFileSystem().mkdirs(agentPath); if (ProviderUtils.addAgentTar(this, AGENT_TAR, fileSystem, agentPath)) { @@ -284,6 +286,12 @@ public class AgentClientProvider extends AbstractClientProvider @Override public Set getApplicationTags(SliderFileSystem fileSystem, + ConfTreeOperations appConf) throws SliderException { + return getApplicationTags(fileSystem, + getApplicationDefinitionPath(appConf)); + } + + public Set getApplicationTags(SliderFileSystem fileSystem, String appDef) throws SliderException { Set tags; Metainfo metaInfo = getMetainfo(fileSystem, appDef); @@ -437,19 +445,19 @@ public class AgentClientProvider extends AbstractClientProvider if (config != null) { try { clientRoot = config.getJSONObject("global") - .getString(AgentKeys.APP_CLIENT_ROOT); + .getString(APP_CLIENT_ROOT); } catch (JSONException e) { log.info("Couldn't read {} from provided client config, falling " + - "back on default", AgentKeys.APP_CLIENT_ROOT); + "back on default", APP_CLIENT_ROOT); } } if (clientRoot == null && defaultConfig != null) { try { clientRoot = defaultConfig.getJSONObject("global") - .getString(AgentKeys.APP_CLIENT_ROOT); + .getString(APP_CLIENT_ROOT); } catch (JSONException e) { log.info("Couldn't read {} from default client config, using {}", - AgentKeys.APP_CLIENT_ROOT, clientInstallPath); + APP_CLIENT_ROOT, clientInstallPath); } } if (clientRoot == null) { @@ -500,7 +508,7 @@ public class AgentClientProvider extends AbstractClientProvider try { String clientScriptPath = appPkgDir.getAbsolutePath() + File.separator + "package" + File.separator + clientScript; - List command = Arrays.asList(AgentKeys.PYTHON_EXE, + List command = Arrays.asList(PYTHON_EXE, "-S", clientScriptPath, "INSTALL", @@ -510,12 +518,12 @@ public class AgentClientProvider extends AbstractClientProvider "DEBUG"); ProcessBuilder pb = new ProcessBuilder(command); log.info("Command: " + StringUtils.join(pb.command(), " ")); - pb.environment().put(SliderKeys.PYTHONPATH, + pb.environment().put(PYTHONPATH, agentPkgDir.getAbsolutePath() + File.separator + "slider-agent" + File.pathSeparator + agentPkgDir.getAbsolutePath() + File.separator + "slider-agent/jinja2"); - log.info("{}={}", SliderKeys.PYTHONPATH, pb.environment().get(SliderKeys.PYTHONPATH)); + log.info("{}={}", PYTHONPATH, pb.environment().get(PYTHONPATH)); Process proc = pb.start(); InputStream stderr = proc.getErrorStream(); @@ -555,8 +563,8 @@ public class AgentClientProvider extends AbstractClientProvider private void expandAgentTar(File agentPkgDir) throws IOException { String libDirProp = - System.getProperty(SliderKeys.PROPERTY_LIB_DIR); - File tarFile = new File(libDirProp, SliderKeys.AGENT_TAR); + System.getProperty(PROPERTY_LIB_DIR); + File tarFile = new File(libDirProp, AGENT_TAR); expandTar(tarFile, agentPkgDir); } @@ -620,7 +628,7 @@ public class AgentClientProvider extends AbstractClientProvider String name) throws SliderException { try { JSONObject pkgList = new JSONObject(); - pkgList.put(AgentKeys.PACKAGE_LIST, + pkgList.put(PACKAGE_LIST, AgentProviderService.getPackageListFromApplication(metainfo.getApplication())); JSONObject obj = new JSONObject(); obj.put("hostLevelParams", pkgList); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ba59bf9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java index 9ea984c..c4228e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java @@ -23,6 +23,7 @@ package org.apache.slider.providers.agent; */ public interface AgentKeys { + String AGENT_TAR = "slider-agent.tar.gz"; String PROVIDER_AGENT = "agent"; String ROLE_NODE = "echo"; @@ -76,23 +77,13 @@ public interface AgentKeys { String AGENT_CONF = "agent.conf"; String ADDON_FOR_ALL_COMPONENTS = "ALL"; - String APP_RESOURCES = "application.resources"; - String APP_RESOURCES_DIR = "app/resources"; - - String APP_CONF_DIR = "app/conf"; - String AGENT_INSTALL_DIR = "infra/agent"; String APP_DEFINITION_DIR = "app/definition"; String ADDON_DEFINITION_DIR = "addon/definition"; String AGENT_CONFIG_FILE = "infra/conf/agent.ini"; String AGENT_VERSION_FILE = "infra/version"; - String APP_PACKAGES_DIR = "app/packages"; - String PER_COMPONENT = "per.component"; - String PER_GROUP = "per.group"; - String JAVA_HOME = "java_home"; String PACKAGE_LIST = "package_list"; - String SYSTEM_CONFIGS = "system_configs"; String WAIT_HEARTBEAT = "wait.heartbeat"; String PYTHON_EXE = "python"; String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node"; @@ -104,7 +95,6 @@ public interface AgentKeys { String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt"; String KEY_CONTAINER_LAUNCH_DELAY = "container.launch.delay.sec"; String TEST_RELAX_VERIFICATION = "test.relax.validation"; - String AM_CONFIG_GENERATION = "am.config.generation"; String DEFAULT_METAINFO_MAP_KEY = "DEFAULT_KEY"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ba59bf9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 2ab5c6f..499812e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -21,20 +21,11 @@ package org.apache.slider.providers.agent; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ProtocolTypes; import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.Container; @@ -55,7 +46,6 @@ import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.conf.MapOperations; -import org.apache.slider.core.exceptions.BadCommandArgumentsException; import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.core.exceptions.NoSuchNodeException; import org.apache.slider.core.exceptions.SliderException; @@ -65,7 +55,6 @@ import org.apache.slider.core.registry.docstore.ConfigFormat; import org.apache.slider.core.registry.docstore.ConfigUtils; import org.apache.slider.core.registry.docstore.ExportEntry; import org.apache.slider.core.registry.docstore.PublishedConfiguration; -import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; import org.apache.slider.core.registry.docstore.PublishedExports; import org.apache.slider.core.registry.info.CustomRegistryConstants; import org.apache.slider.providers.AbstractProviderService; @@ -107,20 +96,15 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register; import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse; import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus; import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand; -import org.apache.slider.server.services.security.CertificateManager; -import org.apache.slider.server.services.security.SecurityStore; -import org.apache.slider.server.services.security.StoresGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -157,10 +141,6 @@ public class AgentProviderService extends AbstractProviderService implements private static final String LABEL_MAKER = "___"; private static final String CONTAINER_ID = "container_id"; private static final String GLOBAL_CONFIG_TAG = "global"; - private static final String LOG_FOLDERS_TAG = "LogFolders"; - private static final String HOST_FOLDER_FORMAT = "%s:%s"; - private static final String CONTAINER_LOGS_TAG = "container_log_dirs"; - private static final String CONTAINER_PWDS_TAG = "container_work_dirs"; private static final String COMPONENT_TAG = "component"; private static final String APPLICATION_TAG = "application"; private static final String COMPONENT_DATA_TAG = "ComponentInstanceData"; @@ -249,13 +229,6 @@ public class AgentProviderService extends AbstractProviderService implements } @Override - public Configuration loadProviderConfigurationInformation(File confDir) throws - BadCommandArgumentsException, - IOException { - return new Configuration(false); - } - - @Override public void validateInstanceDefinition(AggregateConf instanceDefinition) throws SliderException { @@ -265,7 +238,7 @@ public class AgentProviderService extends AbstractProviderService implements instanceDefinition.getResourceOperations(); Set names = resources.getComponentNames(); - names.remove(SliderKeys.COMPONENT_AM); + names.remove(COMPONENT_AM); for (String name : names) { Component componentDef = getApplicationComponent(name); if (componentDef == null) { @@ -350,9 +323,9 @@ public class AgentProviderService extends AbstractProviderService implements // build a map from component to metainfo String addonAppDefString = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(AgentKeys.ADDONS, null); + .getGlobalOptions().getOption(ADDONS, null); if (component != null) { - addonAppDefString = component.getOption(AgentKeys.ADDONS, addonAppDefString); + addonAppDefString = component.getOption(ADDONS, addonAppDefString); } log.debug("All addon appdefs: {}", addonAppDefString); if (addonAppDefString != null) { @@ -415,6 +388,7 @@ public class AgentProviderService extends AbstractProviderService implements if (isYarnDockerContainer(roleGroup)) { launcher.setYarnDockerMode(true); launcher.setDockerImage(getConfigFromMetaInfo(roleGroup, "image")); + launcher.setDockerNetwork(getConfigFromMetaInfo(roleGroup, "network")); launcher.setRunPrivilegedContainer(getConfigFromMetaInfo(roleGroup, "runPriviledgedContainer")); launcher .setYarnContainerMountPoints(getConfigFromMetaInfoWithAppConfigOverriding( @@ -423,7 +397,9 @@ public class AgentProviderService extends AbstractProviderService implements // Set the environment launcher.putEnv(SliderUtils.buildEnvMap(appComponent, - getStandardTokenMap(getAmState().getAppConfSnapshot(), roleName, roleGroup))); + providerUtils.getStandardTokenMap(getAmState().getAppConfSnapshot(), + getAmState().getInternalsSnapshot(), roleName, roleGroup, + getClusterName()))); String workDir = ApplicationConstants.Environment.PWD.$(); launcher.setEnv("AGENT_WORK_ROOT", workDir); @@ -444,17 +420,17 @@ public class AgentProviderService extends AbstractProviderService implements //local resources // TODO: Should agent need to support App Home - String scriptPath = new File(AgentKeys.AGENT_MAIN_SCRIPT_ROOT, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); + String scriptPath = new File(AGENT_MAIN_SCRIPT_ROOT, AGENT_MAIN_SCRIPT).getPath(); String appHome = instanceDefinition.getAppConfOperations(). - getGlobalOptions().get(AgentKeys.PACKAGE_PATH); + getGlobalOptions().get(PACKAGE_PATH); if (SliderUtils.isSet(appHome)) { - scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); + scriptPath = new File(appHome, AGENT_MAIN_SCRIPT).getPath(); } // set PYTHONPATH List pythonPaths = new ArrayList(); - pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT); - pythonPaths.add(AgentKeys.AGENT_JINJA2_ROOT); + pythonPaths.add(AGENT_MAIN_SCRIPT_ROOT); + pythonPaths.add(AGENT_JINJA2_ROOT); String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths); launcher.setEnv(PYTHONPATH, pythonPath); log.info("PYTHONPATH set to {}", pythonPath); @@ -466,21 +442,21 @@ public class AgentProviderService extends AbstractProviderService implements agentImagePath = new Path(new Path(new Path(instanceDefinition.getInternalOperations().get(InternalKeys.INTERNAL_TMP_DIR), container.getId().getApplicationAttemptId().getApplicationId().toString()), - AgentKeys.PROVIDER_AGENT), - SliderKeys.AGENT_TAR); + PROVIDER_AGENT), + AGENT_TAR); } else { agentImagePath = new Path(agentImage); } if (fileSystem.getFileSystem().exists(agentImagePath)) { LocalResource agentImageRes = fileSystem.createAmResource(agentImagePath, LocalResourceType.ARCHIVE); - launcher.addLocalResource(AgentKeys.AGENT_INSTALL_DIR, agentImageRes); + launcher.addLocalResource(AGENT_INSTALL_DIR, agentImageRes); } else { String msg = String.format("Required agent image slider-agent.tar.gz is unavailable at %s", agentImagePath.toString()); MapOperations compOps = appComponent; boolean relaxVerificationForTest = compOps != null ? Boolean.valueOf(compOps. - getOptionBool(AgentKeys.TEST_RELAX_VERIFICATION, false)) : false; + getOptionBool(TEST_RELAX_VERIFICATION, false)) : false; log.error(msg); if (!relaxVerificationForTest) { @@ -492,7 +468,7 @@ public class AgentProviderService extends AbstractProviderService implements LocalResource appDefRes = fileSystem.createAmResource( fileSystem.getFileSystem().resolvePath(new Path(appDef)), LocalResourceType.ARCHIVE); - launcher.addLocalResource(AgentKeys.APP_DEFINITION_DIR, appDefRes); + launcher.addLocalResource(APP_DEFINITION_DIR, appDefRes); for (Package pkg : getMetaInfo(roleGroup).getApplication().getPackages()) { Path pkgPath = fileSystem.buildResourcePath(pkg.getName()); @@ -511,57 +487,61 @@ public class AgentProviderService extends AbstractProviderService implements } LocalResource packageResource = fileSystem.createAmResource( pkgPath, type); - launcher.addLocalResource(AgentKeys.APP_PACKAGES_DIR, packageResource); + launcher.addLocalResource(APP_PACKAGES_DIR, packageResource); } String agentConf = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); + getGlobalOptions().getOption(AGENT_CONF, ""); if (SliderUtils.isSet(agentConf)) { LocalResource agentConfRes = fileSystem.createAmResource(fileSystem .getFileSystem().resolvePath(new Path(agentConf)), LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes); + launcher.addLocalResource(AGENT_CONFIG_FILE, agentConfRes); } String agentVer = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_VERSION, null); + getGlobalOptions().getOption(AGENT_VERSION, null); if (agentVer != null) { LocalResource agentVerRes = fileSystem.createAmResource( fileSystem.getFileSystem().resolvePath(new Path(agentVer)), LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.AGENT_VERSION_FILE, agentVerRes); + launcher.addLocalResource(AGENT_VERSION_FILE, agentVerRes); } if (SliderUtils.isHadoopClusterSecure(getConfig())) { - localizeServiceKeytabs(launcher, instanceDefinition, fileSystem); + providerUtils.localizeServiceKeytabs(launcher, instanceDefinition, + fileSystem, getClusterName()); } MapOperations amComponent = instanceDefinition. - getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM); - boolean twoWayEnabled = amComponent != null ? Boolean.valueOf(amComponent. - getOptionBool(AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) : false; - if (twoWayEnabled) { - localizeContainerSSLResources(launcher, container, fileSystem); + getAppConfOperations().getComponent(COMPONENT_AM); + if (providerUtils.hasTwoWaySSLEnabled(amComponent)) { + providerUtils.localizeContainerSSLResources(launcher, container, + fileSystem, getClusterName()); } - MapOperations compOps = appComponent; - if (areStoresRequested(compOps)) { - localizeContainerSecurityStores(launcher, container, roleName, fileSystem, - instanceDefinition, compOps); + if (providerUtils.areStoresRequested(appComponent)) { + providerUtils.localizeContainerSecurityStores(launcher, container, + roleName, fileSystem, instanceDefinition, appComponent, + getClusterName()); } //add the configuration resources launcher.addLocalResources(fileSystem.submitDirectory( generatedConfPath, - SliderKeys.PROPAGATED_CONF_DIR_NAME)); + PROPAGATED_CONF_DIR_NAME)); - if (appComponent.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, false)) { + if (appComponent.getOptionBool(AM_CONFIG_GENERATION, false)) { // build and localize configuration files Map> configurations = buildCommandConfigurations(instanceDefinition.getAppConfOperations(), + instanceDefinition.getInternalOperations(), container.getId().toString(), roleName, roleGroup); - localizeConfigFiles(launcher, roleName, roleGroup, getMetaInfo(roleGroup), - configurations, launcher.getEnv(), fileSystem); + for (ConfigFile configFile : getMetaInfo(roleGroup) + .getComponentConfigFiles(roleGroup)) { + localizeConfigFile(launcher, roleName, roleGroup, configFile, + configurations, launcher.getEnv(), fileSystem); + } } String label = getContainerLabel(container, roleName, roleGroup); @@ -569,7 +549,7 @@ public class AgentProviderService extends AbstractProviderService implements String pythonExec = instanceDefinition.getAppConfOperations() .getGlobalOptions().getOption(SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH, - AgentKeys.PYTHON_EXE); + PYTHON_EXE); operation.add(pythonExec); @@ -587,13 +567,13 @@ public class AgentProviderService extends AbstractProviderService implements } operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" - + AgentKeys.AGENT_OUT_FILE + " 2>&1"); + + AGENT_OUT_FILE + " 2>&1"); launcher.addCommand(operation.build()); // localize addon package String addonAppDefString = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(AgentKeys.ADDONS, null); + .getGlobalOptions().getOption(ADDONS, null); log.debug("All addon appdefs: {}", addonAppDefString); if (addonAppDefString != null) { Scanner scanner = new Scanner(addonAppDefString).useDelimiter(","); @@ -605,7 +585,7 @@ public class AgentProviderService extends AbstractProviderService implements LocalResource addonPkgRes = fileSystem.createAmResource( fileSystem.getFileSystem().resolvePath(new Path(addonAppDefPath)), LocalResourceType.ARCHIVE); - launcher.addLocalResource(AgentKeys.ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes); + launcher.addLocalResource(ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes); } log.debug("Metainfo map for master and addon: {}", packageMetainfo.toString()); @@ -613,7 +593,7 @@ public class AgentProviderService extends AbstractProviderService implements // Additional files to localize in addition to the application def String appResourcesString = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(AgentKeys.APP_RESOURCES, null); + .getGlobalOptions().getOption(APP_RESOURCES, null); log.info("Configuration value for extra resources to localize: {}", appResourcesString); if (null != appResourcesString) { try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) { @@ -623,7 +603,7 @@ public class AgentProviderService extends AbstractProviderService implements LocalResource extraResource = fileSystem.createAmResource( fileSystem.getFileSystem().resolvePath(resourcePath), LocalResourceType.FILE); - String destination = AgentKeys.APP_RESOURCES_DIR + "/" + resourcePath.getName(); + String destination = APP_RESOURCES_DIR + "/" + resourcePath.getName(); log.info("Localizing {} to {}", resourcePath, destination); // TODO Can we try harder to avoid collisions? launcher.addLocalResource(destination, extraResource); @@ -641,7 +621,7 @@ public class AgentProviderService extends AbstractProviderService implements log.debug("Current component: {} component in metainfo: {}", roleName, comp.getName()); if (comp.getName().equals(roleGroup) - || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) { + || comp.getName().equals(ADDON_FOR_ALL_COMPONENTS)) { pkgStatuses.put(appPkg.getApplicationPackage().getName(), State.INIT); } } @@ -658,253 +638,18 @@ public class AgentProviderService extends AbstractProviderService implements pkgStatuses)); } - private void localizeContainerSecurityStores(ContainerLauncher launcher, - Container container, - String role, - SliderFileSystem fileSystem, - AggregateConf instanceDefinition, - MapOperations compOps) - throws SliderException, IOException { - // substitute CLUSTER_NAME into credentials - Map> newcred = new HashMap<>(); - for (Entry> entry : instanceDefinition.getAppConf().credentials.entrySet()) { - List resultList = new ArrayList<>(); - for (String v : entry.getValue()) { - resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"), - clusterName).replaceAll(Pattern.quote("${CLUSTER}"), - clusterName)); - } - newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"), - clusterName).replaceAll(Pattern.quote("${CLUSTER}"), - clusterName), - resultList); - } - instanceDefinition.getAppConf().credentials = newcred; - - // generate and localize security stores - SecurityStore[] stores = generateSecurityStores(container, role, - instanceDefinition, compOps); - for (SecurityStore store : stores) { - LocalResource keystoreResource = fileSystem.createAmResource( - uploadSecurityResource(store.getFile(), fileSystem), LocalResourceType.FILE); - launcher.addLocalResource(String.format("secstores/%s-%s.p12", - store.getType(), role), - keystoreResource); - } - } - - private SecurityStore[] generateSecurityStores(Container container, - String role, - AggregateConf instanceDefinition, - MapOperations compOps) - throws SliderException, IOException { - return StoresGenerator.generateSecurityStores(container.getNodeId().getHost(), - container.getId().toString(), role, - instanceDefinition, compOps); - } - - private boolean areStoresRequested(MapOperations compOps) { - return compOps != null ? compOps. - getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false) : false; - } - - private void localizeContainerSSLResources(ContainerLauncher launcher, - Container container, - SliderFileSystem fileSystem) - throws SliderException { - try { - // localize server cert - Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName()); - LocalResource certResource = fileSystem.createAmResource( - new Path(certsDir, SliderKeys.CRT_FILE_NAME), - LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.CERT_FILE_LOCALIZATION_PATH, - certResource); - - // generate and localize agent cert - CertificateManager certMgr = new CertificateManager(); - String hostname = container.getNodeId().getHost(); - String containerId = container.getId().toString(); - certMgr.generateContainerCertificate(hostname, containerId); - LocalResource agentCertResource = fileSystem.createAmResource( - uploadSecurityResource( - CertificateManager.getAgentCertficateFilePath(containerId), - fileSystem), LocalResourceType.FILE); - // still using hostname as file name on the agent side, but the files - // do end up under the specific container's file space - launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname + - ".crt", agentCertResource); - LocalResource agentKeyResource = fileSystem.createAmResource( - uploadSecurityResource( - CertificateManager.getAgentKeyFilePath(containerId), fileSystem), - LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname + - ".key", agentKeyResource); - - } catch (Exception e) { - throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e, - "Unable to localize certificates. Two-way SSL cannot be enabled"); - } - } - - private Path uploadSecurityResource(File resource, SliderFileSystem fileSystem) - throws IOException { - Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName()); - return uploadResource(resource, fileSystem, certsDir); - } - - private Path uploadResource(File resource, SliderFileSystem fileSystem, - String roleName) throws IOException { - Path dir; - if (roleName == null) { - dir = fileSystem.buildClusterResourcePath(getClusterName()); - } else { - dir = fileSystem.buildClusterResourcePath(getClusterName(), roleName); - } - return uploadResource(resource, fileSystem, dir); - } - - private static synchronized Path uploadResource(File resource, - SliderFileSystem fileSystem, Path parentDir) throws IOException { - if (!fileSystem.getFileSystem().exists(parentDir)) { - fileSystem.getFileSystem().mkdirs(parentDir, - new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); - } - Path destPath = new Path(parentDir, resource.getName()); - if (!fileSystem.getFileSystem().exists(destPath)) { - FSDataOutputStream os = null; - try { - os = fileSystem.getFileSystem().create(destPath); - byte[] contents = FileUtils.readFileToByteArray(resource); - os.write(contents, 0, contents.length); - os.flush(); - } finally { - IOUtils.closeStream(os); - } - log.info("Uploaded {} to localization path {}", resource, destPath); - } else { - log.info("Resource {} already existed at localization path {}", resource, - destPath); - } - - while (!fileSystem.getFileSystem().exists(destPath)) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - } - - fileSystem.getFileSystem().setPermission(destPath, - new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE)); - - return destPath; - } - - private void localizeServiceKeytabs(ContainerLauncher launcher, - AggregateConf instanceDefinition, - SliderFileSystem fileSystem) - throws IOException { - String keytabPathOnHost = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM).get( - SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); - if (SliderUtils.isUnset(keytabPathOnHost)) { - String amKeytabName = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM).get( - SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); - String keytabDir = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM).get( - SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR); - // we need to localize the keytab files in the directory - Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null, - getClusterName()); - boolean serviceKeytabsDeployed = false; - if (fileSystem.getFileSystem().exists(keytabDirPath)) { - FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(keytabDirPath); - LocalResource keytabRes; - for (FileStatus keytab : keytabs) { - if (!amKeytabName.equals(keytab.getPath().getName()) - && keytab.getPath().getName().endsWith(".keytab")) { - serviceKeytabsDeployed = true; - log.info("Localizing keytab {}", keytab.getPath().getName()); - keytabRes = fileSystem.createAmResource(keytab.getPath(), - LocalResourceType.FILE); - launcher.addLocalResource(SliderKeys.KEYTAB_DIR + "/" + - keytab.getPath().getName(), - keytabRes); - } - } - } - if (!serviceKeytabsDeployed) { - log.warn("No service keytabs for the application have been localized. " - + "If the application requires keytabs for secure operation, " - + "please ensure that the required keytabs have been uploaded " - + "to the folder {}", keytabDirPath); - } - } - } - - private void createConfigFile(SliderFileSystem fileSystem, File file, - ConfigFile configFile, Map config) - throws IOException { - ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType()); - log.info("Writing {} file {}", configFormat, file); - - ConfigUtils.prepConfigForTemplateOutputter(configFormat, config, - fileSystem, getClusterName(), file.getName()); - PublishedConfiguration publishedConfiguration = - new PublishedConfiguration(configFile.getDictionaryName(), - config.entrySet()); - PublishedConfigurationOutputter configurationOutputter = - PublishedConfigurationOutputter.createOutputter(configFormat, - publishedConfiguration); - configurationOutputter.save(file); - } - @VisibleForTesting - protected void localizeConfigFiles(ContainerLauncher launcher, + protected void localizeConfigFile(ContainerLauncher launcher, String roleName, String roleGroup, - Metainfo metainfo, + ConfigFile configFile, Map> configs, MapOperations env, SliderFileSystem fileSystem) throws IOException { - for (ConfigFile configFile : metainfo.getComponentConfigFiles(roleGroup)) { - Map config = ConfigUtils.replacePropsInConfig( - configs.get(configFile.getDictionaryName()), env.options); - String fileName = ConfigUtils.replaceProps(config, - configFile.getFileName()); - File localFile = new File(SliderKeys.RESOURCE_DIR); - if (!localFile.exists()) { - localFile.mkdir(); - } - localFile = new File(localFile, new File(fileName).getName()); - - String folder = null; - if ("true".equals(config.get(PER_COMPONENT))) { - folder = roleName; - } else if ("true".equals(config.get(PER_GROUP))) { - folder = roleGroup; - } - - log.info("Localizing {} configs to config file {} (destination {}) " + - "based on {} configs", config.size(), localFile, fileName, - configFile.getDictionaryName()); - createConfigFile(fileSystem, localFile, configFile, config); - Path destPath = uploadResource(localFile, fileSystem, folder); - LocalResource configResource = fileSystem.createAmResource(destPath, - LocalResourceType.FILE); - - File destFile = new File(fileName); - if (destFile.isAbsolute()) { - launcher.addLocalResource( - SliderKeys.RESOURCE_DIR + "/" + destFile.getName(), - configResource, fileName); - } else { - launcher.addLocalResource(AgentKeys.APP_CONF_DIR + "/" + fileName, - configResource); - } - } + ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType()); + providerUtils.localizeConfigFile(launcher, roleName, roleGroup, + configFile.getDictionaryName(), configFormat, configFile.getFileName(), + configs, env, fileSystem, getClusterName()); } /** @@ -1053,7 +798,6 @@ public class AgentProviderService extends AbstractProviderService implements doUpgrade = true; } - StateAccessForProviders accessor = getAmState(); CommandScript cmdScript = getScriptPathForMasterPackage(roleGroup); List commands = getApplicationComponent(roleGroup).getCommands(); @@ -1089,45 +833,10 @@ public class AgentProviderService extends AbstractProviderService implements List statuses = heartBeat.getComponentStatus(); if (statuses != null && !statuses.isEmpty()) { log.info("status from agent: " + statuses.toString()); - try { - for(ComponentStatus status : statuses){ - RoleInstance role = null; - if(status.getIp() != null && !status.getIp().isEmpty()){ - role = amState.getOwnedContainer(containerId); - role.ip = status.getIp(); - } - if(status.getHostname() != null && !status.getHostname().isEmpty()){ - role = amState.getOwnedContainer(containerId); - role.hostname = status.getHostname(); - } - if (role != null) { - // create an updated service record (including hostname and ip) and publish... - ServiceRecord record = new ServiceRecord(); - record.set(YarnRegistryAttributes.YARN_ID, containerId); - record.description = roleName; - record.set(YarnRegistryAttributes.YARN_PERSISTENCE, - PersistencePolicies.CONTAINER); - // TODO: switch record attributes to use constants from YarnRegistryAttributes - // when it's been updated. - if (role.ip != null) { - record.set("yarn:ip", role.ip); - } - if (role.hostname != null) { - record.set("yarn:hostname", role.hostname); - } - yarnRegistry.putComponent( - RegistryPathUtils.encodeYarnID(containerId), record); - - } - } - - - } catch (NoSuchNodeException e) { - // ignore - there is nothing to do if we don't find a container - log.warn("Owned container {} not found - {}", containerId, e); - } catch (IOException e) { - log.warn("Error updating container {} service record in registry", - containerId, e); + for(ComponentStatus status : statuses){ + providerUtils.updateServiceRecord(getAmState(), yarnRegistry, + containerId, roleName, Collections.singletonList(status.getIp()), + status.getHostname()); } } @@ -1179,8 +888,8 @@ public class AgentProviderService extends AbstractProviderService implements } } - int waitForCount = accessor.getInstanceDefinitionSnapshot(). - getAppConfOperations().getComponentOptInt(roleGroup, AgentKeys.WAIT_HEARTBEAT, 0); + int waitForCount = getAmState().getInstanceDefinitionSnapshot(). + getAppConfOperations().getComponentOptInt(roleGroup, WAIT_HEARTBEAT, 0); if (id < waitForCount) { log.info("Waiting until heartbeat count {}. Current val: {}", waitForCount, id); @@ -1223,7 +932,7 @@ public class AgentProviderService extends AbstractProviderService implements log.debug("Addon component: {} pkg: {} script: {}", comp.getName(), nextPkgToInstall, comp.getCommandScript().getScript()); if (comp.getName().equals(roleGroup) - || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) { + || comp.getName().equals(ADDON_FOR_ALL_COMPONENTS)) { scriptPath = comp.getCommandScript().getScript(); if (scriptPath != null) { addInstallCommand(roleName, roleGroup, containerId, response, @@ -1406,15 +1115,12 @@ public class AgentProviderService extends AbstractProviderService implements return details; } - @Override public void applyInitialRegistryDefinitions(URL amWebURI, URL agentOpsURI, URL agentStatusURI, ServiceRecord serviceRecord) throws IOException { super.applyInitialRegistryDefinitions(amWebURI, - agentOpsURI, - agentStatusURI, serviceRecord); try { @@ -1450,20 +1156,17 @@ public class AgentProviderService extends AbstractProviderService implements ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); MapOperations clientOperations = appConf.getOrAddComponent(client.getName()); appConf.resolve(); - if (!clientOperations.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, + if (!clientOperations.getOptionBool(AM_CONFIG_GENERATION, false)) { log.info("AM config generation is false, not publishing client configs"); return; } // build and localize configuration files - Map> configurations = new TreeMap>(); - Map tokens = null; - try { - tokens = getStandardTokenMap(appConf, client.getName(), client.getName()); - } catch (SliderException e) { - throw new IOException(e); - } + Map> configurations = new TreeMap<>(); + Map tokens = providerUtils.getStandardTokenMap(appConf, + getAmState().getInternalsSnapshot(), client.getName(), + client.getName(), getClusterName()); for (ConfigFile configFile : getMetaInfo().getComponentConfigFiles(client.getName())) { addNamedConfiguration(configFile.getDictionaryName(), @@ -1561,7 +1264,7 @@ public class AgentProviderService extends AbstractProviderService implements */ private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) { String hbMonitorInterval = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL, + getGlobalOptions().getOption(HEARTBEAT_MONITOR_INTERVAL, Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL)); try { setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval)); @@ -1581,7 +1284,7 @@ public class AgentProviderService extends AbstractProviderService implements */ private void initializeAgentDebugCommands(AggregateConf instanceDefinition) { String launchParameterStr = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, ""); + getGlobalOptions().getOption(AGENT_INSTANCE_DEBUG_DATA, ""); agentLaunchParameter = new AgentLaunchParameter(launchParameterStr); } @@ -1699,20 +1402,11 @@ public class AgentProviderService extends AbstractProviderService implements return clusterName; } - /** - * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site - * - * @param name - * @param description - * @param entries - */ + @VisibleForTesting protected void publishApplicationInstanceData(String name, String description, Iterable> entries) { - PublishedConfiguration pubconf = new PublishedConfiguration(); - pubconf.description = description; - pubconf.putValues(entries); - log.info("publishing {}", pubconf); - getAmState().getPublishedSliderConfigurations().put(name, pubconf); + providerUtils.publishApplicationInstanceData(name, description, entries, + getAmState()); } /** @@ -1771,72 +1465,14 @@ public class AgentProviderService extends AbstractProviderService implements return stats; } - - /** - * Format the folder locations and publish in the registry service - * - * @param folders - * @param containerId - * @param hostFqdn - * @param componentName - */ + @VisibleForTesting protected void publishFolderPaths( Map folders, String containerId, String componentName, String hostFqdn) { - Date now = new Date(); - for (Map.Entry entry : folders.entrySet()) { - ExportEntry exportEntry = new ExportEntry(); - exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue())); - exportEntry.setContainerId(containerId); - exportEntry.setLevel(COMPONENT_TAG); - exportEntry.setTag(componentName); - exportEntry.setUpdatedTime(now.toString()); - if (entry.getKey().equals("AGENT_LOG_ROOT")) { - synchronized (logFolderExports) { - getLogFolderExports().put(containerId, exportEntry); - } - } else { - synchronized (workFolderExports) { - getWorkFolderExports().put(containerId, exportEntry); - } - } - log.info("Updating log and pwd folders for container {}", containerId); - } - - PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG); - exports.setUpdated(now.getTime()); - synchronized (logFolderExports) { - updateExportsFromList(exports, getLogFolderExports()); - } - getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports); - - exports = new PublishedExports(CONTAINER_PWDS_TAG); - exports.setUpdated(now.getTime()); - synchronized (workFolderExports) { - updateExportsFromList(exports, getWorkFolderExports()); - } - getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports); + providerUtils.publishFolderPaths(folders, containerId, componentName, hostFqdn, + getAmState(), getLogFolderExports(), getWorkFolderExports()); } /** - * Update the export data from the map - * @param exports - * @param folderExports - */ - private void updateExportsFromList(PublishedExports exports, Map folderExports) { - Map> perComponentList = new HashMap>(); - for(Map.Entry logEntry : folderExports.entrySet()) - { - String componentName = logEntry.getValue().getTag(); - if (!perComponentList.containsKey(componentName)) { - perComponentList.put(componentName, new ArrayList()); - } - perComponentList.get(componentName).add(logEntry.getValue()); - } - exports.putValues(perComponentList.entrySet()); - } - - - /** * Process return status for component instances * * @param heartBeat @@ -1855,7 +1491,7 @@ public class AgentProviderService extends AbstractProviderService implements if ((!canAnyMasterPublishConfig(componentGroup) || canPublishConfig(componentGroup)) && !getAmState().getAppConfSnapshot().getComponentOptBool( - componentGroup, AgentKeys.AM_CONFIG_GENERATION, false)) { + componentGroup, AM_CONFIG_GENERATION, false)) { // If no Master can explicitly publish then publish if its a master // Otherwise, wait till the master that can publish is ready @@ -1899,7 +1535,8 @@ public class AgentProviderService extends AbstractProviderService implements // publish export groups if any Map replaceTokens = new HashMap(); for (Map.Entry> entry : getRoleClusterNodeMapping().entrySet()) { - String hostName = getHostsList(entry.getValue().values(), true).iterator().next(); + String hostName = providerUtils.getHostsList( + entry.getValue().values(), true).iterator().next(); replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName); } @@ -1967,28 +1604,24 @@ public class AgentProviderService extends AbstractProviderService implements } private void publishModifiedExportGroups(Set modifiedGroups) { - for (String groupName : modifiedGroups) { - Map> entries = this.exportGroups.get(groupName); - + for (String roleGroup : modifiedGroups) { + Map> entries = this.exportGroups.get(roleGroup); // Publish in old format for the time being Map simpleEntries = new HashMap(); - for (Map.Entry> entry : entries.entrySet()) { + for (Entry> entry : entries.entrySet()) { List exports = entry.getValue(); if (SliderUtils.isNotEmpty(exports)) { // there is no support for multiple exports per name - so extract only the first one simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue()); } } - if (!getAmState().getAppConfSnapshot().getComponentOptBool( - groupName, AgentKeys.AM_CONFIG_GENERATION, false)) { - publishApplicationInstanceData(groupName, groupName, - simpleEntries.entrySet()); - } + publishApplicationInstanceData(roleGroup, roleGroup, + simpleEntries.entrySet()); - PublishedExports exports = new PublishedExports(groupName); + PublishedExports exports = new PublishedExports(roleGroup); exports.setUpdated(new Date().getTime()); exports.putValues(entries.entrySet()); - getAmState().getPublishedExportsSet().put(groupName, exports); + getAmState().getPublishedExportsSet().put(roleGroup, exports); } } @@ -2310,7 +1943,8 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setHostLevelParams(hostLevelParams); Map> configurations = - buildCommandConfigurations(appConf, containerId, roleName, roleGroup); + buildCommandConfigurations(appConf, getAmState().getInternalsSnapshot(), + containerId, roleName, roleGroup); cmd.setConfigurations(configurations); Map> componentConfigurations = buildComponentConfigurations(appConf); cmd.setComponentConfigurations(componentConfigurations); @@ -2359,7 +1993,8 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setHostLevelParams(hostLevelParams); Map> configurations = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, getAmState().getInternalsSnapshot(), containerId, roleName, + roleGroup); cmd.setConfigurations(configurations); Map> componentConfigurations = buildComponentConfigurations(appConf); cmd.setComponentConfigurations(componentConfigurations); @@ -2522,7 +2157,9 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setCommandParams(commandParametersSet(scriptPath, timeout, false)); - Map> configurations = buildCommandConfigurations(appConf, containerId, roleName, roleGroup); + Map> configurations = + buildCommandConfigurations(appConf, getAmState().getInternalsSnapshot(), + containerId, roleName, roleGroup); cmd.setConfigurations(configurations); @@ -2557,7 +2194,8 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setCommandParams(setCommandParameters(scriptPath, timeout, false)); Map> configurations = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, getAmState().getInternalsSnapshot(), containerId, roleName, + roleGroup); Map dockerConfig = new HashMap(); String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "statusCommand"); if (statusCommand == null) { @@ -2598,7 +2236,8 @@ public class AgentProviderService extends AbstractProviderService implements ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); Map> configurations = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, getAmState().getInternalsSnapshot(), containerId, roleName, + roleGroup); Map dockerConfig = new HashMap(); String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "statusCommand"); if (statusCommand == null) { @@ -2690,7 +2329,8 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setCommandParams(setCommandParameters(startCommand, timeout, true)); Map> configurations = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, getAmState().getInternalsSnapshot(), containerId, roleName, + roleGroup); Map> componentConfigurations = buildComponentConfigurations(appConf); cmd.setComponentConfigurations(componentConfigurations); @@ -2794,13 +2434,6 @@ public class AgentProviderService extends AbstractProviderService implements result = container.getNetwork(); } break; - case "useNetworkScript": - if (container.getUseNetworkScript() == null || container.getUseNetworkScript().isEmpty()) { - result = "yes"; - } else { - result = container.getUseNetworkScript(); - } - break; case "statusCommand": result = container.getStatusCommand(); break; @@ -2903,7 +2536,9 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setRoleParams(roleParams); cmd.getRoleParams().put("auto_restart", Boolean.toString(isMarkedAutoRestart)); - Map> configurations = buildCommandConfigurations(appConf, containerId, roleName, roleGroup); + Map> configurations = + buildCommandConfigurations(appConf, internalsConf, containerId, + roleName, roleGroup); cmd.setConfigurations(configurations); Map> componentConfigurations = buildComponentConfigurations(appConf); cmd.setComponentConfigurations(componentConfigurations); @@ -2957,7 +2592,7 @@ public class AgentProviderService extends AbstractProviderService implements Map> configurationsStop = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, internalsConf, containerId, roleName, roleGroup); cmdStop.setConfigurations(configurationsStop); response.addExecutionCommand(cmdStop); } @@ -2989,7 +2624,7 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setCommandParams(commandParametersSet(scriptPath, timeout, true)); Map> configurations = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, internalsConf, containerId, roleName, roleGroup); cmd.setConfigurations(configurations); response.addExecutionCommand(cmd); } @@ -3023,7 +2658,7 @@ public class AgentProviderService extends AbstractProviderService implements cmdStop.setCommandParams(commandParametersSet(scriptPath, timeout, true)); Map> configurationsStop = buildCommandConfigurations( - appConf, containerId, roleName, roleGroup); + appConf, internalsConf, containerId, roleName, roleGroup); cmdStop.setConfigurations(configurationsStop); response.addExecutionCommand(cmdStop); } @@ -3062,12 +2697,13 @@ public class AgentProviderService extends AbstractProviderService implements } private Map> buildCommandConfigurations( - ConfTreeOperations appConf, String containerId, String roleName, String roleGroup) + ConfTreeOperations appConf, ConfTreeOperations internalsConf, + String containerId, String roleName, String roleGroup) throws SliderException { - Map> configurations = - new TreeMap>(); - Map tokens = getStandardTokenMap(appConf, roleName, roleGroup); + Map> configurations = new TreeMap<>(); + Map tokens = providerUtils.getStandardTokenMap(appConf, + internalsConf, roleName, roleGroup, getClusterName()); tokens.put("${CONTAINER_ID}", containerId); Set configs = new HashSet(); @@ -3090,111 +2726,16 @@ public class AgentProviderService extends AbstractProviderService implements return configurations; } + @VisibleForTesting protected void dereferenceAllConfigs(Map> configurations) { - Map allConfigs = new HashMap(); - String lookupFormat = "${@//site/%s/%s}"; - for (String configType : configurations.keySet()) { - Map configBucket = configurations.get(configType); - for (String configName : configBucket.keySet()) { - allConfigs.put(String.format(lookupFormat, configType, configName), configBucket.get(configName)); - } - } - - boolean finished = false; - while (!finished) { - finished = true; - for (Map.Entry entry : allConfigs.entrySet()) { - String configValue = entry.getValue(); - for (Map.Entry lookUpEntry : allConfigs.entrySet()) { - String lookUpValue = lookUpEntry.getValue(); - if (lookUpValue.contains("${@//site/")) { - continue; - } - String lookUpKey = lookUpEntry.getKey(); - if (configValue != null && configValue.contains(lookUpKey)) { - configValue = configValue.replace(lookUpKey, lookUpValue); - } - } - if (!configValue.equals(entry.getValue())) { - finished = false; - allConfigs.put(entry.getKey(), configValue); - } - } - } - - for (String configType : configurations.keySet()) { - Map configBucket = configurations.get(configType); - for (Map.Entry entry: configBucket.entrySet()) { - String configName = entry.getKey(); - String configValue = entry.getValue(); - for (Map.Entry lookUpEntry : allConfigs.entrySet()) { - String lookUpValue = lookUpEntry.getValue(); - if (lookUpValue.contains("${@//site/")) { - continue; - } - String lookUpKey = lookUpEntry.getKey(); - if (configValue != null && configValue.contains(lookUpKey)) { - configValue = configValue.replace(lookUpKey, lookUpValue); - } - } - configBucket.put(configName, configValue); - } - } - } - - private Map getStandardTokenMap(ConfTreeOperations appConf, - String componentName, String componentGroup) throws SliderException { - Map tokens = new HashMap(); - String nnuri = appConf.get("site.fs.defaultFS"); - tokens.put("${NN_URI}", nnuri); - tokens.put("${NN_HOST}", URI.create(nnuri).getHost()); - tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS)); - tokens.put("${DEFAULT_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH)); - String prefix = appConf.getComponentOpt(componentGroup, ROLE_PREFIX, - null); - String dataDirSuffix = ""; - if (prefix == null) { - prefix = ""; - } else { - dataDirSuffix = "_" + SliderUtils.trimPrefix(prefix); - } - tokens.put("${DEFAULT_DATA_DIR}", getAmState() - .getInternalsSnapshot() - .getGlobalOptions() - .getMandatoryOption(InternalKeys.INTERNAL_DATA_DIR_PATH) + dataDirSuffix); - tokens.put("${JAVA_HOME}", appConf.get(AgentKeys.JAVA_HOME)); - tokens.put("${COMPONENT_NAME}", componentName); - tokens.put("${COMPONENT_NAME.lc}", componentName.toLowerCase()); - tokens.put("${COMPONENT_PREFIX}", prefix); - tokens.put("${COMPONENT_PREFIX.lc}", prefix.toLowerCase()); - if (!componentName.equals(componentGroup) && componentName.startsWith(componentGroup)) { - tokens.put("${COMPONENT_ID}", componentName.substring(componentGroup.length())); - } - tokens.put("${CLUSTER_NAME}", getClusterName()); - tokens.put("${CLUSTER_NAME.lc}", getClusterName().toLowerCase()); - tokens.put("${APP_NAME}", getClusterName()); - tokens.put("${APP_NAME.lc}", getClusterName().toLowerCase()); - tokens.put("${APP_COMPONENT_NAME}", componentName); - tokens.put("${APP_COMPONENT_NAME.lc}", componentName.toLowerCase()); - return tokens; + providerUtils.dereferenceAllConfigs(configurations); } @VisibleForTesting - protected List getSystemConfigurationsRequested(ConfTreeOperations appConf) { - List configList = new ArrayList(); - - String configTypes = appConf.get(AgentKeys.SYSTEM_CONFIGS); - if (configTypes != null && configTypes.length() > 0) { - String[] configs = configTypes.split(","); - for (String config : configs) { - configList.add(config.trim()); - } - } - - return new ArrayList(new HashSet(configList)); + protected Set getSystemConfigurationsRequested(ConfTreeOperations appConf) { + return providerUtils.getSystemConfigurationsRequested(appConf); } - @VisibleForTesting protected List getApplicationConfigurationTypes(String roleGroup) { List configList = new ArrayList(); @@ -3275,30 +2816,17 @@ public class AgentProviderService extends AbstractProviderService implements configurations.put(configName, config); } + @VisibleForTesting protected void addRoleRelatedTokens(Map tokens) { - for (Map.Entry> entry : getRoleClusterNodeMapping().entrySet()) { - String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST"; - String hosts = StringUtils.join(",", getHostsList(entry.getValue().values(), true)); - tokens.put("${" + tokenName + "}", hosts); - } - } - - private Iterable getHostsList(Collection values, - boolean hostOnly) { - List hosts = new ArrayList(); - for (ClusterNode cn : values) { - hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name); - } - - return hosts; + providerUtils.addRoleRelatedTokens(tokens, getAmState()); } private void addDefaultGlobalConfig(Map config, String containerId, String roleName) { config.put("app_log_dir", "${AGENT_LOG_ROOT}"); config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run"); config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install"); - config.put("app_conf_dir", "${AGENT_WORK_ROOT}/" + AgentKeys.APP_CONF_DIR); - config.put("app_input_conf_dir", "${AGENT_WORK_ROOT}/" + SliderKeys.PROPAGATED_CONF_DIR_NAME); + config.put("app_conf_dir", "${AGENT_WORK_ROOT}/" + APP_CONF_DIR); + config.put("app_input_conf_dir", "${AGENT_WORK_ROOT}/" + PROPAGATED_CONF_DIR_NAME); config.put("app_container_id", containerId); config.put("app_container_tag", tags.getTag(roleName, containerId)); @@ -3315,7 +2843,8 @@ public class AgentProviderService extends AbstractProviderService implements for (Map.Entry> entry : getRoleClusterNodeMapping().entrySet()) { details.put(entry.getKey() + " Host(s)/Container(s)", - new MonitorDetail(getHostsList(entry.getValue().values(), false).toString(), false)); + new MonitorDetail(providerUtils.getHostsList( + entry.getValue().values(), false).toString(), false)); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ba59bf9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java new file mode 100644 index 0000000..13473e5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.providers.docker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.ProviderRole; +import org.apache.slider.providers.ProviderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.slider.providers.docker.DockerKeys.DOCKER_IMAGE; + +public class DockerClientProvider extends AbstractClientProvider + implements SliderKeys { + + protected static final Logger log = + LoggerFactory.getLogger(DockerClientProvider.class); + private static final ProviderUtils providerUtils = new ProviderUtils(log); + protected static final String NAME = "docker"; + + public DockerClientProvider(Configuration conf) { + super(conf); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public List getRoles() { + return Collections.emptyList(); + } + + @Override + public void validateInstanceDefinition(AggregateConf instanceDefinition, + SliderFileSystem fs) throws SliderException { + super.validateInstanceDefinition(instanceDefinition, fs); + + ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); + ConfTreeOperations resources = instanceDefinition.getResourceOperations(); + + for (String roleGroup : resources.getComponentNames()) { + if (roleGroup.equals(COMPONENT_AM)) { + continue; + } + if (appConf.getComponentOpt(roleGroup, DOCKER_IMAGE, null) == null && + appConf.getGlobalOptions().get(DOCKER_IMAGE) == null) { + throw new BadConfigException("Property " + DOCKER_IMAGE + " not " + + "specified for " + roleGroup); + } + + providerUtils.getPackages(roleGroup, appConf); + + if (appConf.getComponentOptBool(roleGroup, AM_CONFIG_GENERATION, false)) { + // build and localize configuration files + Map> configurations = + providerUtils.buildConfigurations(appConf, appConf, null, roleGroup, + roleGroup, null); + try { + providerUtils.localizeConfigFiles(null, roleGroup, roleGroup, appConf, + configurations, null, fs, null); + } catch (IOException e) { + throw new BadConfigException(e.toString()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ba59bf9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java new file mode 100644 index 0000000..40b73a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.providers.docker; + +public interface DockerKeys { + String PROVIDER_DOCKER = "docker"; + String DOCKER_PREFIX = "docker."; + String DOCKER_IMAGE = DOCKER_PREFIX + "image"; + String DOCKER_NETWORK = DOCKER_PREFIX + "network"; + String DOCKER_USE_PRIVILEGED = DOCKER_PREFIX + "usePrivileged"; + String DOCKER_START_COMMAND = DOCKER_PREFIX + "startCommand"; + + String DEFAULT_DOCKER_NETWORK = "bridge"; + + String OUT_FILE = "stdout.txt"; + String ERR_FILE = "stderr.txt"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ba59bf9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java new file mode 100644 index 0000000..5d2592f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.providers.docker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.ProviderService; +import org.apache.slider.providers.SliderProviderFactory; + +public class DockerProviderFactory extends SliderProviderFactory { + + public DockerProviderFactory() { + } + + public DockerProviderFactory(Configuration conf) { + super(conf); + } + + @Override + public AbstractClientProvider createClientProvider() { + return new DockerClientProvider(getConf()); + } + + @Override + public ProviderService createServerProvider() { + return new DockerProviderService(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org