Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2C97A200C5B for ; Tue, 4 Apr 2017 21:56:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2B427160B90; Tue, 4 Apr 2017 19:56:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 186CA160B77 for ; Tue, 4 Apr 2017 21:56:34 +0200 (CEST) Received: (qmail 717 invoked by uid 500); 4 Apr 2017 19:56:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 98172 invoked by uid 99); 4 Apr 2017 19:56:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 19:56:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C53AE178B; Tue, 4 Apr 2017 19:56:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Tue, 04 Apr 2017 19:57:07 -0000 Message-Id: <05bd4544a6c243afaf479574bdaa83bb@git.apache.org> In-Reply-To: <83db38e6376c4b2a8806af407cf6bf57@git.apache.org> References: <83db38e6376c4b2a8806af407cf6bf57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [43/50] [abbrv] hadoop git commit: MAPREDUCE-6824. TaskAttemptImpl#createCommonContainerLaunchContext is longer than 150 lines. Contributed by Chris Trezzo. archived-at: Tue, 04 Apr 2017 19:56:36 -0000 MAPREDUCE-6824. TaskAttemptImpl#createCommonContainerLaunchContext is longer than 150 lines. Contributed by Chris Trezzo. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/845529b3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/845529b3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/845529b3 Branch: refs/heads/HDFS-7240 Commit: 845529b3ab338e759665a687eb525fb2cccde7bf Parents: a4b5aa8 Author: Akira Ajisaka Authored: Mon Apr 3 13:06:24 2017 +0900 Committer: Akira Ajisaka Committed: Mon Apr 3 13:06:54 2017 +0900 ---------------------------------------------------------------------- .../v2/app/job/impl/TaskAttemptImpl.java | 285 ++++++++++--------- 1 file changed, 153 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/845529b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 4305824..9ea1b9a 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -755,7 +755,7 @@ public abstract class TaskAttemptImpl implements new HashMap(); // Application environment - Map environment = new HashMap(); + Map environment; // Service data Map serviceData = new HashMap(); @@ -763,157 +763,178 @@ public abstract class TaskAttemptImpl implements // Tokens ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{}); try { - FileSystem remoteFS = FileSystem.get(conf); - - // //////////// Set up JobJar to be localized properly on the remote NM. - String jobJar = conf.get(MRJobConfig.JAR); - if (jobJar != null) { - final Path jobJarPath = new Path(jobJar); - final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); - Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), - jobJarFs.getWorkingDirectory()); - LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, - LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); - String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, - JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); - rc.setPattern(pattern); - localResources.put(MRJobConfig.JOB_JAR, rc); - LOG.info("The job-jar file on the remote FS is " - + remoteJobJar.toUri().toASCIIString()); - } else { - // Job jar may be null. For e.g, for pipes, the job jar is the hadoop - // mapreduce jar itself which is already on the classpath. - LOG.info("Job jar is not present. " - + "Not adding any jar to the list of resources."); - } - // //////////// End of JobJar setup - - // //////////// Set up JobConf to be localized properly on the remote NM. - Path path = - MRApps.getStagingAreaDir(conf, UserGroupInformation - .getCurrentUser().getShortUserName()); - Path remoteJobSubmitDir = - new Path(path, oldJobId.toString()); - Path remoteJobConfPath = - new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); - localResources.put( - MRJobConfig.JOB_CONF_FILE, - createLocalResource(remoteFS, remoteJobConfPath, - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); - LOG.info("The job-conf file on the remote FS is " - + remoteJobConfPath.toUri().toASCIIString()); - // //////////// End of JobConf setup - // Setup DistributedCache - MRApps.setupDistributedCache(conf, localResources); + configureJobJar(conf, localResources); - // Setup up task credentials buffer - LOG.info("Adding #" + credentials.numberOfTokens() - + " tokens and #" + credentials.numberOfSecretKeys() - + " secret keys for NM use for launching container"); - Credentials taskCredentials = new Credentials(credentials); + configureJobConf(conf, localResources, oldJobId); - // LocalStorageToken is needed irrespective of whether security is enabled - // or not. - TokenCache.setJobToken(jobToken, taskCredentials); + // Setup DistributedCache + MRApps.setupDistributedCache(conf, localResources); - DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); - LOG.info("Size of containertokens_dob is " - + taskCredentials.numberOfTokens()); - taskCredentials.writeTokenStorageToStream(containerTokens_dob); taskCredentialsBuffer = - ByteBuffer.wrap(containerTokens_dob.getData(), 0, - containerTokens_dob.getLength()); - - // Add shuffle secret key - // The secret key is converted to a JobToken to preserve backwards - // compatibility with an older ShuffleHandler running on an NM. - LOG.info("Putting shuffle token in serviceData"); - byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials); - if (shuffleSecret == null) { - LOG.warn("Cannot locate shuffle secret in credentials." - + " Using job token as shuffle secret."); - shuffleSecret = jobToken.getPassword(); - } - Token shuffleToken = new Token( - jobToken.getIdentifier(), shuffleSecret, jobToken.getKind(), - jobToken.getService()); - serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, - ShuffleHandler.serializeServiceData(shuffleToken)); - - // add external shuffle-providers - if any - Collection shuffleProviders = conf.getStringCollection( - MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES); - if (! shuffleProviders.isEmpty()) { - Collection auxNames = conf.getStringCollection( - YarnConfiguration.NM_AUX_SERVICES); - - for (final String shuffleProvider : shuffleProviders) { - if (shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) { - continue; // skip built-in shuffle-provider that was already inserted with shuffle secret key - } - if (auxNames.contains(shuffleProvider)) { - LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + " to serviceData"); - // This only serves for INIT_APP notifications - // The shuffle service needs to be able to work with the host:port information provided by the AM - // (i.e. shuffle services which require custom location / other configuration are not supported) - serviceData.put(shuffleProvider, ByteBuffer.allocate(0)); - } - else { - throw new YarnRuntimeException("ShuffleProvider Service: " + shuffleProvider + - " was NOT found in the list of aux-services that are available in this NM." + - " You may need to specify this ShuffleProvider as an aux-service in your yarn-site.xml"); - } - } - } + configureTokens(jobToken, credentials, serviceData); - MRApps.addToEnvironment( - environment, - Environment.CLASSPATH.name(), - getInitialClasspath(conf), conf); + addExternalShuffleProviders(conf, serviceData); + + environment = configureEnv(conf); - if (initialAppClasspath != null) { - MRApps.addToEnvironment( - environment, - Environment.APP_CLASSPATH.name(), - initialAppClasspath, conf); - } } catch (IOException e) { throw new YarnRuntimeException(e); } - // Shell - environment.put( - Environment.SHELL.name(), - conf.get( - MRJobConfig.MAPRED_ADMIN_USER_SHELL, - MRJobConfig.DEFAULT_SHELL) - ); - - // Add pwd to LD_LIBRARY_PATH, add this before adding anything else - MRApps.addToEnvironment( - environment, - Environment.LD_LIBRARY_PATH.name(), - MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf); - - // Add the env variables passed by the admin - MRApps.setEnvFromInputString( - environment, - conf.get( - MRJobConfig.MAPRED_ADMIN_USER_ENV, - MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf - ); - // Construct the actual Container // The null fields are per-container and will be constructed for each // container separately. ContainerLaunchContext container = ContainerLaunchContext.newInstance(localResources, environment, null, - serviceData, taskCredentialsBuffer, applicationACLs); + serviceData, taskCredentialsBuffer, applicationACLs); return container; } + private static Map configureEnv(Configuration conf) + throws IOException { + Map environment = new HashMap(); + MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), + getInitialClasspath(conf), conf); + + if (initialAppClasspath != null) { + MRApps.addToEnvironment(environment, Environment.APP_CLASSPATH.name(), + initialAppClasspath, conf); + } + + // Shell + environment.put(Environment.SHELL.name(), conf + .get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL)); + + // Add pwd to LD_LIBRARY_PATH, add this before adding anything else + MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(), + MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf); + + // Add the env variables passed by the admin + MRApps.setEnvFromInputString(environment, + conf.get(MRJobConfig.MAPRED_ADMIN_USER_ENV, + MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), + conf); + return environment; + } + + private static void configureJobJar(Configuration conf, + Map localResources) throws IOException { + // Set up JobJar to be localized properly on the remote NM. + String jobJar = conf.get(MRJobConfig.JAR); + if (jobJar != null) { + final Path jobJarPath = new Path(jobJar); + final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); + Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), + jobJarFs.getWorkingDirectory()); + LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, + LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); + String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, + JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); + rc.setPattern(pattern); + localResources.put(MRJobConfig.JOB_JAR, rc); + LOG.info("The job-jar file on the remote FS is " + + remoteJobJar.toUri().toASCIIString()); + } else { + // Job jar may be null. For e.g, for pipes, the job jar is the hadoop + // mapreduce jar itself which is already on the classpath. + LOG.info("Job jar is not present. " + + "Not adding any jar to the list of resources."); + } + } + + private static void configureJobConf(Configuration conf, + Map localResources, + final org.apache.hadoop.mapred.JobID oldJobId) throws IOException { + // Set up JobConf to be localized properly on the remote NM. + Path path = MRApps.getStagingAreaDir(conf, + UserGroupInformation.getCurrentUser().getShortUserName()); + Path remoteJobSubmitDir = new Path(path, oldJobId.toString()); + Path remoteJobConfPath = + new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); + FileSystem remoteFS = FileSystem.get(conf); + localResources.put(MRJobConfig.JOB_CONF_FILE, + createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION)); + LOG.info("The job-conf file on the remote FS is " + + remoteJobConfPath.toUri().toASCIIString()); + } + + private static ByteBuffer configureTokens(Token jobToken, + Credentials credentials, + Map serviceData) throws IOException { + // Setup up task credentials buffer + LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #" + + credentials.numberOfSecretKeys() + + " secret keys for NM use for launching container"); + Credentials taskCredentials = new Credentials(credentials); + + // LocalStorageToken is needed irrespective of whether security is enabled + // or not. + TokenCache.setJobToken(jobToken, taskCredentials); + + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + LOG.info( + "Size of containertokens_dob is " + taskCredentials.numberOfTokens()); + taskCredentials.writeTokenStorageToStream(containerTokens_dob); + ByteBuffer taskCredentialsBuffer = + ByteBuffer.wrap(containerTokens_dob.getData(), 0, + containerTokens_dob.getLength()); + + // Add shuffle secret key + // The secret key is converted to a JobToken to preserve backwards + // compatibility with an older ShuffleHandler running on an NM. + LOG.info("Putting shuffle token in serviceData"); + byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials); + if (shuffleSecret == null) { + LOG.warn("Cannot locate shuffle secret in credentials." + + " Using job token as shuffle secret."); + shuffleSecret = jobToken.getPassword(); + } + Token shuffleToken = + new Token(jobToken.getIdentifier(), shuffleSecret, + jobToken.getKind(), jobToken.getService()); + serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeServiceData(shuffleToken)); + return taskCredentialsBuffer; + } + + private static void addExternalShuffleProviders(Configuration conf, + Map serviceData) { + // add external shuffle-providers - if any + Collection shuffleProviders = conf.getStringCollection( + MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES); + if (!shuffleProviders.isEmpty()) { + Collection auxNames = + conf.getStringCollection(YarnConfiguration.NM_AUX_SERVICES); + + for (final String shuffleProvider : shuffleProviders) { + if (shuffleProvider + .equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) { + continue; // skip built-in shuffle-provider that was already inserted + // with shuffle secret key + } + if (auxNames.contains(shuffleProvider)) { + LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + + " to serviceData"); + // This only serves for INIT_APP notifications + // The shuffle service needs to be able to work with the host:port + // information provided by the AM + // (i.e. shuffle services which require custom location / other + // configuration are not supported) + serviceData.put(shuffleProvider, ByteBuffer.allocate(0)); + } else { + throw new YarnRuntimeException("ShuffleProvider Service: " + + shuffleProvider + + " was NOT found in the list of aux-services that are " + + "available in this NM. You may need to specify this " + + "ShuffleProvider as an aux-service in your yarn-site.xml"); + } + } + } + } + static ContainerLaunchContext createContainerLaunchContext( Map applicationACLs, Configuration conf, Token jobToken, Task remoteTask, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org