Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D9B7D200D1B for ; Thu, 12 Oct 2017 20:14:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D84291609E4; Thu, 12 Oct 2017 18:14:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CE376160BE4 for ; Thu, 12 Oct 2017 20:13:58 +0200 (CEST) Received: (qmail 67113 invoked by uid 500); 12 Oct 2017 18:13:57 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 67104 invoked by uid 99); 12 Oct 2017 18:13:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2017 18:13:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 62D41DFAF5; Thu, 12 Oct 2017 18:13:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctrezzo@apache.org To: common-commits@hadoop.apache.org Date: Thu, 12 Oct 2017 18:13:58 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: MAPREDUCE-5951. Add support for the YARN Shared Cache. archived-at: Thu, 12 Oct 2017 18:14:01 -0000 MAPREDUCE-5951. Add support for the YARN Shared Cache. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e46d5bb9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e46d5bb9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e46d5bb9 Branch: refs/heads/trunk Commit: e46d5bb962b0c942f993afc505b165b1cd96e51b Parents: 13fcfb3 Author: Chris Trezzo Authored: Thu Oct 12 10:58:02 2017 -0700 Committer: Chris Trezzo Committed: Thu Oct 12 10:59:20 2017 -0700 ---------------------------------------------------------------------- .../mapreduce/v2/app/job/impl/JobImpl.java | 16 + .../v2/app/job/impl/TaskAttemptImpl.java | 52 ++- .../mapreduce/v2/util/LocalResourceBuilder.java | 169 ++++++++ .../apache/hadoop/mapreduce/v2/util/MRApps.java | 137 ++---- .../TestLocalDistributedCacheManager.java | 9 + .../hadoop/mapreduce/v2/util/TestMRApps.java | 8 +- .../hadoop-mapreduce-client-core/pom.xml | 6 + .../java/org/apache/hadoop/mapreduce/Job.java | 226 ++++++++++ .../hadoop/mapreduce/JobResourceUploader.java | 416 ++++++++++++++++--- .../apache/hadoop/mapreduce/MRJobConfig.java | 71 ++++ .../hadoop/mapreduce/SharedCacheConfig.java | 102 +++++ .../src/main/resources/mapred-default.xml | 11 + .../src/site/markdown/SharedCacheSupport.md | 100 +++++ .../mapreduce/TestJobResourceUploader.java | 76 ++-- .../TestJobResourceUploaderWithSharedCache.java | 365 ++++++++++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 54 ++- .../hadoop/mapred/TestLocalJobSubmission.java | 52 +++ .../apache/hadoop/mapreduce/v2/TestMRJobs.java | 59 +++ hadoop-project/src/site/site.xml | 1 + 19 files changed, 1701 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 757c545..d2e2492 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -1414,6 +1415,19 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, new char[] {'"', '=', '.'}); } + /* + * The goal is to make sure only the NM that hosts MRAppMaster will upload + * resources to shared cache. Clean up the shared cache policies for all + * resources so that later when TaskAttemptImpl creates + * ContainerLaunchContext, LocalResource.setShouldBeUploadedToSharedCache will + * be set up to false. In that way, the NMs that host the task containers + * won't try to upload the resources to shared cache. + */ + private static void cleanupSharedCacheUploadPolicies(Configuration conf) { + Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap()); + Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap()); + } + public static class InitTransition implements MultipleArcTransition { @@ -1492,6 +1506,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); + cleanupSharedCacheUploadPolicies(job.conf); + // create the Tasks but don't start them yet createMapTasks(job, inputLength, taskSplitMetaInfo); createReduceTasks(job); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 650f387..00c7b84 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -708,17 +710,38 @@ public abstract class TaskAttemptImpl implements /** * Create a {@link LocalResource} record with all the given parameters. + * The NM that hosts AM container will upload resources to shared cache. + * Thus there is no need to ask task container's NM to upload the + * resources to shared cache. Set the shared cache upload policy to + * false. */ private static LocalResource createLocalResource(FileSystem fc, Path file, - LocalResourceType type, LocalResourceVisibility visibility) - throws IOException { + String fileSymlink, LocalResourceType type, + LocalResourceVisibility visibility) throws IOException { FileStatus fstat = fc.getFileStatus(file); - URL resourceURL = URL.fromPath(fc.resolvePath(fstat.getPath())); + // We need to be careful when converting from path to URL to add a fragment + // so that the symlink name when localized will be correct. + Path qualifiedPath = fc.resolvePath(fstat.getPath()); + URI uriWithFragment = null; + boolean useFragment = fileSymlink != null && !fileSymlink.equals(""); + try { + if (useFragment) { + uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink); + } else { + uriWithFragment = qualifiedPath.toUri(); + } + } catch (URISyntaxException e) { + throw new IOException( + "Error parsing local resource path." + + " Path was not able to be converted to a URI: " + qualifiedPath, + e); + } + URL resourceURL = URL.fromURI(uriWithFragment); long resourceSize = fstat.getLen(); long resourceModificationTime = fstat.getModificationTime(); return LocalResource.newInstance(resourceURL, type, visibility, - resourceSize, resourceModificationTime); + resourceSize, resourceModificationTime, false); } /** @@ -829,8 +852,18 @@ public abstract class TaskAttemptImpl implements final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), jobJarFs.getWorkingDirectory()); - LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, - LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); + LocalResourceVisibility jobJarViz = + conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY, + MRJobConfig.JOBJAR_VISIBILITY_DEFAULT) + ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.APPLICATION; + // We hard code the job.jar localized symlink in the container directory. + // This is because the mapreduce app expects the job.jar to be named + // accordingly. Additionally we set the shared cache upload policy to + // false. Resources are uploaded by the AM if necessary. + LocalResource rc = + createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR, + LocalResourceType.PATTERN, jobJarViz); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); rc.setPattern(pattern); @@ -855,9 +888,12 @@ public abstract class TaskAttemptImpl implements Path remoteJobConfPath = new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); FileSystem remoteFS = FileSystem.get(conf); + // There is no point to ask task container's NM to upload the resource + // to shared cache (job conf is not shared). Therefore, createLocalResource + // will set the shared cache upload policy to false localResources.put(MRJobConfig.JOB_CONF_FILE, - createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE, - LocalResourceVisibility.APPLICATION)); + createLocalResource(remoteFS, remoteJobConfPath, null, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-conf file on the remote FS is " + remoteJobConfPath.toUri().toASCIIString()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java new file mode 100644 index 0000000..cb55e13 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java @@ -0,0 +1,169 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.util; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; + +/** + * Helper class for MR applications that parses distributed cache artifacts and + * creates a map of LocalResources. + */ +@SuppressWarnings("deprecation") +@Private +@Unstable +class LocalResourceBuilder { + public static final Log LOG = LogFactory.getLog(LocalResourceBuilder.class); + + private Configuration conf; + private LocalResourceType type; + private URI[] uris; + private long[] timestamps; + private long[] sizes; + private boolean[] visibilities; + private Map sharedCacheUploadPolicies; + + LocalResourceBuilder() { + } + + void setConf(Configuration c) { + this.conf = c; + } + + void setType(LocalResourceType t) { + this.type = t; + } + + void setUris(URI[] u) { + this.uris = u; + } + + void setTimestamps(long[] t) { + this.timestamps = t; + } + + void setSizes(long[] s) { + this.sizes = s; + } + + void setVisibilities(boolean[] v) { + this.visibilities = v; + } + + void setSharedCacheUploadPolicies(Map policies) { + this.sharedCacheUploadPolicies = policies; + } + + void createLocalResources(Map localResources) + throws IOException { + + if (uris != null) { + // Sanity check + if ((uris.length != timestamps.length) || (uris.length != sizes.length) || + (uris.length != visibilities.length)) { + throw new IllegalArgumentException("Invalid specification for " + + "distributed-cache artifacts of type " + type + " :" + + " #uris=" + uris.length + + " #timestamps=" + timestamps.length + + " #visibilities=" + visibilities.length + ); + } + + for (int i = 0; i < uris.length; ++i) { + URI u = uris[i]; + Path p = new Path(u); + FileSystem remoteFS = p.getFileSystem(conf); + String linkName = null; + + if (p.getName().equals(DistributedCache.WILDCARD)) { + p = p.getParent(); + linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD; + } + + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + + // If there's no wildcard, try using the fragment for the link + if (linkName == null) { + linkName = u.getFragment(); + + // Because we don't know what's in the fragment, we have to handle + // it with care. + if (linkName != null) { + Path linkPath = new Path(linkName); + + if (linkPath.isAbsolute()) { + throw new IllegalArgumentException("Resource name must be " + + "relative"); + } + + linkName = linkPath.toUri().getPath(); + } + } else if (u.getFragment() != null) { + throw new IllegalArgumentException("Invalid path URI: " + p + + " - cannot contain both a URI fragment and a wildcard"); + } + + // If there's no wildcard or fragment, just link to the file name + if (linkName == null) { + linkName = p.getName(); + } + + LocalResource orig = localResources.get(linkName); + if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) { + throw new InvalidJobConfException( + getResourceDescription(orig.getType()) + orig.getResource() + + + " conflicts with " + getResourceDescription(type) + u); + } + Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString()); + sharedCachePolicy = + sharedCachePolicy == null ? Boolean.FALSE : sharedCachePolicy; + localResources.put(linkName, LocalResource.newInstance(URL.fromURI(p + .toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.PRIVATE, + sizes[i], timestamps[i], sharedCachePolicy)); + } + } + } + + private static String getResourceDescription(LocalResourceType type) { + if (type == LocalResourceType.ARCHIVE + || type == LocalResourceType.PATTERN) { + return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; + } + return "cache file (" + MRJobConfig.CACHE_FILES + ") "; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index a43da65..5777117 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskLog; @@ -67,12 +67,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Apps; -import org.apache.hadoop.yarn.util.ConverterUtils; /** * Helper class for MR applications @@ -251,10 +248,16 @@ public class MRApps extends Apps { if (!userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); } + /* + * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for + * the case where the job jar is not necessarily named "job.jar". This can + * happen, for example, when the job is leveraging a resource from the YARN + * shared cache. + */ MRApps.addToEnvironment( environment, classpathEnvVar, - MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf); + MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf); MRApps.addToEnvironment( environment, classpathEnvVar, @@ -471,27 +474,32 @@ public class MRApps extends Apps { return startCommitFile; } - public static void setupDistributedCache( - Configuration conf, - Map localResources) - throws IOException { - + @SuppressWarnings("deprecation") + public static void setupDistributedCache(Configuration conf, + Map localResources) throws IOException { + + LocalResourceBuilder lrb = new LocalResourceBuilder(); + lrb.setConf(conf); + // Cache archives - parseDistributedCacheArtifacts(conf, localResources, - LocalResourceType.ARCHIVE, - DistributedCache.getCacheArchives(conf), - DistributedCache.getArchiveTimestamps(conf), - getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), - DistributedCache.getArchiveVisibilities(conf)); + lrb.setType(LocalResourceType.ARCHIVE); + lrb.setUris(DistributedCache.getCacheArchives(conf)); + lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf)); + lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES)); + lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf)); + lrb.setSharedCacheUploadPolicies( + Job.getArchiveSharedCacheUploadPolicies(conf)); + lrb.createLocalResources(localResources); // Cache files - parseDistributedCacheArtifacts(conf, - localResources, - LocalResourceType.FILE, - DistributedCache.getCacheFiles(conf), - DistributedCache.getFileTimestamps(conf), - getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), - DistributedCache.getFileVisibilities(conf)); + lrb.setType(LocalResourceType.FILE); + lrb.setUris(DistributedCache.getCacheFiles(conf)); + lrb.setTimestamps(DistributedCache.getFileTimestamps(conf)); + lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES)); + lrb.setVisibilities(DistributedCache.getFileVisibilities(conf)); + lrb.setSharedCacheUploadPolicies( + Job.getFileSharedCacheUploadPolicies(conf)); + lrb.createLocalResources(localResources); } /** @@ -550,89 +558,6 @@ public class MRApps extends Apps { } } - private static String getResourceDescription(LocalResourceType type) { - if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) { - return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; - } - return "cache file (" + MRJobConfig.CACHE_FILES + ") "; - } - - // TODO - Move this to MR! - // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], - // long[], boolean[], Path[], FileType) - private static void parseDistributedCacheArtifacts( - Configuration conf, - Map localResources, - LocalResourceType type, - URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) - throws IOException { - - if (uris != null) { - // Sanity check - if ((uris.length != timestamps.length) || (uris.length != sizes.length) || - (uris.length != visibilities.length)) { - throw new IllegalArgumentException("Invalid specification for " + - "distributed-cache artifacts of type " + type + " :" + - " #uris=" + uris.length + - " #timestamps=" + timestamps.length + - " #visibilities=" + visibilities.length - ); - } - - for (int i = 0; i < uris.length; ++i) { - URI u = uris[i]; - Path p = new Path(u); - FileSystem remoteFS = p.getFileSystem(conf); - String linkName = null; - - if (p.getName().equals(DistributedCache.WILDCARD)) { - p = p.getParent(); - linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD; - } - - p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory())); - - // If there's no wildcard, try using the fragment for the link - if (linkName == null) { - linkName = u.getFragment(); - - // Because we don't know what's in the fragment, we have to handle - // it with care. - if (linkName != null) { - Path linkPath = new Path(linkName); - - if (linkPath.isAbsolute()) { - throw new IllegalArgumentException("Resource name must be " - + "relative"); - } - - linkName = linkPath.toUri().getPath(); - } - } else if (u.getFragment() != null) { - throw new IllegalArgumentException("Invalid path URI: " + p + - " - cannot contain both a URI fragment and a wildcard"); - } - - // If there's no wildcard or fragment, just link to the file name - if (linkName == null) { - linkName = p.getName(); - } - - LocalResource orig = localResources.get(linkName); - if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) { - throw new InvalidJobConfException( - getResourceDescription(orig.getType()) + orig.getResource() + - " conflicts with " + getResourceDescription(type) + u); - } - localResources.put(linkName, LocalResource - .newInstance(URL.fromURI(p.toUri()), type, visibilities[i] - ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE, - sizes[i], timestamps[i])); - } - } - } - // TODO - Move this to MR! private static long[] getFileSizes(Configuration conf, String key) { String[] strs = conf.getStrings(key); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java index ec80e65..d2814e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -30,6 +30,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -39,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; @@ -164,6 +167,9 @@ public class TestLocalDistributedCacheManager { }); DistributedCache.addCacheFile(file, conf); + Map policies = new HashMap(); + policies.put(file.toString(), true); + Job.setFileSharedCacheUploadPolicies(conf, policies); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false"); @@ -272,6 +278,9 @@ public class TestLocalDistributedCacheManager { DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file, conf); + Map policies = new HashMap(); + policies.put(file.toString(), true); + Job.setFileSharedCacheUploadPolicies(conf, policies); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 1e33b12..3aadd63 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -261,7 +261,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar", + Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!", @@ -281,7 +281,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", + Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in" + " the classpath!", env_str.contains(expectedClasspath)); @@ -303,7 +303,7 @@ public class TestMRApps { assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!", cp.contains("PWD")); String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar", + Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app" @@ -332,7 +332,7 @@ public class TestMRApps { conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH); MRApps.setClasspath(env, conf); final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", + Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, Arrays.asList(ApplicationConstants.Environment.PWD.$$(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index ce5fdc8..5e902d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -52,6 +52,12 @@ test + org.apache.hadoop + hadoop-hdfs + test-jar + test + + org.skyscreamer jsonassert test http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 5530d95..a09f034 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -21,12 +21,17 @@ package org.apache.hadoop.mapreduce; import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.FileSystem; @@ -1303,6 +1308,227 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable { } } + /** + * Add a file to job config for shared cache processing. If shared cache is + * enabled, it will return true, otherwise, return false. We don't check with + * SCM here given application might not be able to provide the job id; + * ClientSCMProtocol.use requires the application id. Job Submitter will read + * the files from job config and take care of things. + * + * @param resource The resource that Job Submitter will process later using + * shared cache. + * @param conf Configuration to add the resource to + * @return whether the resource has been added to the configuration + */ + @Unstable + public static boolean addFileToSharedCache(URI resource, Configuration conf) { + SharedCacheConfig scConfig = new SharedCacheConfig(); + scConfig.init(conf); + if (scConfig.isSharedCacheFilesEnabled()) { + String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE); + conf.set( + MRJobConfig.FILES_FOR_SHARED_CACHE, + files == null ? resource.toString() : files + "," + + resource.toString()); + return true; + } else { + return false; + } + } + + /** + * Add a file to job config for shared cache processing. If shared cache is + * enabled, it will return true, otherwise, return false. We don't check with + * SCM here given application might not be able to provide the job id; + * ClientSCMProtocol.use requires the application id. Job Submitter will read + * the files from job config and take care of things. Job Submitter will also + * add the file to classpath. Intended to be used by user code. + * + * @param resource The resource that Job Submitter will process later using + * shared cache. + * @param conf Configuration to add the resource to + * @return whether the resource has been added to the configuration + */ + @Unstable + public static boolean addFileToSharedCacheAndClasspath(URI resource, + Configuration conf) { + SharedCacheConfig scConfig = new SharedCacheConfig(); + scConfig.init(conf); + if (scConfig.isSharedCacheLibjarsEnabled()) { + String files = + conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE); + conf.set( + MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE, + files == null ? resource.toString() : files + "," + + resource.toString()); + return true; + } else { + return false; + } + } + + /** + * Add an archive to job config for shared cache processing. If shared cache + * is enabled, it will return true, otherwise, return false. We don't check + * with SCM here given application might not be able to provide the job id; + * ClientSCMProtocol.use requires the application id. Job Submitter will read + * the files from job config and take care of things. Intended to be used by + * user code. + * + * @param resource The resource that Job Submitter will process later using + * shared cache. + * @param conf Configuration to add the resource to + * @return whether the resource has been added to the configuration + */ + @Unstable + public static boolean addArchiveToSharedCache(URI resource, + Configuration conf) { + SharedCacheConfig scConfig = new SharedCacheConfig(); + scConfig.init(conf); + if (scConfig.isSharedCacheArchivesEnabled()) { + String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE); + conf.set( + MRJobConfig.ARCHIVES_FOR_SHARED_CACHE, + files == null ? resource.toString() : files + "," + + resource.toString()); + return true; + } else { + return false; + } + } + + /** + * This is to set the shared cache upload policies for files. If the parameter + * was previously set, this method will replace the old value with the new + * provided map. + * + * @param conf Configuration which stores the shared cache upload policies + * @param policies A map containing the shared cache upload policies for a set + * of resources. The key is the url of the resource and the value is + * the upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static void setFileSharedCacheUploadPolicies(Configuration conf, + Map policies) { + setSharedCacheUploadPolicies(conf, policies, true); + } + + /** + * This is to set the shared cache upload policies for archives. If the + * parameter was previously set, this method will replace the old value with + * the new provided map. + * + * @param conf Configuration which stores the shared cache upload policies + * @param policies A map containing the shared cache upload policies for a set + * of resources. The key is the url of the resource and the value is + * the upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static void setArchiveSharedCacheUploadPolicies(Configuration conf, + Map policies) { + setSharedCacheUploadPolicies(conf, policies, false); + } + + // We use a double colon because a colon is a reserved character in a URI and + // there should not be two colons next to each other. + private static final String DELIM = "::"; + + /** + * Set the shared cache upload policies config parameter. This is done by + * serializing the provided map of shared cache upload policies into a config + * parameter. If the parameter was previously set, this method will replace + * the old value with the new provided map. + * + * @param conf Configuration which stores the shared cache upload policies + * @param policies A map containing the shared cache upload policies for a set + * of resources. The key is the url of the resource and the value is + * the upload policy. True if it should be uploaded, false otherwise. + * @param areFiles True if these policies are for files, false if they are for + * archives. + */ + private static void setSharedCacheUploadPolicies(Configuration conf, + Map policies, boolean areFiles) { + if (policies != null) { + StringBuilder sb = new StringBuilder(); + Iterator> it = policies.entrySet().iterator(); + Map.Entry e; + if (it.hasNext()) { + e = it.next(); + sb.append(e.getKey() + DELIM + e.getValue()); + } else { + // policies is an empty map, just skip setting the parameter + return; + } + while (it.hasNext()) { + e = it.next(); + sb.append("," + e.getKey() + DELIM + e.getValue()); + } + String confParam = + areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES + : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + conf.set(confParam, sb.toString()); + } + } + + /** + * Deserialize a map of shared cache upload policies from a config parameter. + * + * @param conf Configuration which stores the shared cache upload policies + * @param areFiles True if these policies are for files, false if they are for + * archives. + * @return A map containing the shared cache upload policies for a set of + * resources. The key is the url of the resource and the value is the + * upload policy. True if it should be uploaded, false otherwise. + */ + private static Map getSharedCacheUploadPolicies( + Configuration conf, boolean areFiles) { + String confParam = + areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES + : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + Collection policies = conf.getStringCollection(confParam); + String[] policy; + Map policyMap = new LinkedHashMap(); + for (String s : policies) { + policy = s.split(DELIM); + if (policy.length != 2) { + LOG.error(confParam + + " is mis-formatted, returning empty shared cache upload policies." + + " Error on [" + s + "]"); + return new LinkedHashMap(); + } + policyMap.put(policy[0], Boolean.parseBoolean(policy[1])); + } + return policyMap; + } + + /** + * This is to get the shared cache upload policies for files. + * + * @param conf Configuration which stores the shared cache upload policies + * @return A map containing the shared cache upload policies for a set of + * resources. The key is the url of the resource and the value is the + * upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static Map getFileSharedCacheUploadPolicies( + Configuration conf) { + return getSharedCacheUploadPolicies(conf, true); + } + + /** + * This is to get the shared cache upload policies for archives. + * + * @param conf Configuration which stores the shared cache upload policies + * @return A map containing the shared cache upload policies for a set of + * resources. The key is the url of the resource and the value is the + * upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static Map getArchiveSharedCacheUploadPolicies( + Configuration conf) { + return getSharedCacheUploadPolicies(conf, false); + } + private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index d9bf988..a044fc1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -24,12 +24,13 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; +import java.util.LinkedHashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -40,30 +41,100 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.client.api.SharedCacheClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; -@InterfaceAudience.Private -@InterfaceStability.Unstable +/** + * This class is responsible for uploading resources from the client to HDFS + * that are associated with a MapReduce job. + */ +@Private +@Unstable class JobResourceUploader { protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class); private final boolean useWildcard; private final FileSystem jtFs; + private SharedCacheClient scClient = null; + private SharedCacheConfig scConfig = new SharedCacheConfig(); + private ApplicationId appId = null; JobResourceUploader(FileSystem submitFs, boolean useWildcard) { this.jtFs = submitFs; this.useWildcard = useWildcard; } + private void initSharedCache(JobID jobid, Configuration conf) { + this.scConfig.init(conf); + if (this.scConfig.isSharedCacheEnabled()) { + this.scClient = createSharedCacheClient(conf); + appId = jobIDToAppId(jobid); + } + } + + /* + * We added this method so that we could do the conversion between JobId and + * ApplicationId for the shared cache client. This logic is very similar to + * the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use + * that because mapreduce-client-core can not depend on + * mapreduce-client-common. + */ + private ApplicationId jobIDToAppId(JobID jobId) { + return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()), + jobId.getId()); + } + + private void stopSharedCache() { + if (scClient != null) { + scClient.stop(); + scClient = null; + } + } + + /** + * Create, initialize and start a new shared cache client. + */ + @VisibleForTesting + protected SharedCacheClient createSharedCacheClient(Configuration conf) { + SharedCacheClient scc = SharedCacheClient.createSharedCacheClient(); + scc.init(conf); + scc.start(); + return scc; + } + /** * Upload and configure files, libjars, jobjars, and archives pertaining to * the passed job. - * + *

+ * This client will use the shared cache for libjars, files, archives and + * jobjars if it is enabled. When shared cache is enabled, it will try to use + * the shared cache and fall back to the default behavior when the scm isn't + * available. + *

+ * 1. For the resources that have been successfully shared, we will continue + * to use them in a shared fashion. + *

+ * 2. For the resources that weren't in the cache and need to be uploaded by + * NM, we won't ask NM to upload them. + * * @param job the job containing the files to be uploaded * @param submitJobDir the submission directory of the job * @throws IOException */ public void uploadResources(Job job, Path submitJobDir) throws IOException { + try { + initSharedCache(job.getJobID(), job.getConfiguration()); + uploadResourcesInternal(job, submitJobDir); + } finally { + stopSharedCache(); + } + } + + private void uploadResourcesInternal(Job job, Path submitJobDir) + throws IOException { Configuration conf = job.getConfiguration(); short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, @@ -90,6 +161,7 @@ class JobResourceUploader { + " already exists!! This is unexpected.Please check what's there in" + " that directory"); } + // Create the submission directory for the MapReduce job. submitJobDir = jtFs.makeQualified(submitJobDir); submitJobDir = new Path(submitJobDir.toUri().getPath()); FsPermission mapredSysPerms = @@ -101,20 +173,45 @@ class JobResourceUploader { disableErasureCodingForPath(jtFs, submitJobDir); } + // Get the resources that have been added via command line arguments in the + // GenericOptionsParser (i.e. files, libjars, archives). Collection files = conf.getStringCollection("tmpfiles"); Collection libjars = conf.getStringCollection("tmpjars"); Collection archives = conf.getStringCollection("tmparchives"); String jobJar = job.getJar(); + // Merge resources that have been programmatically specified for the shared + // cache via the Job API. + files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE)); + libjars.addAll(conf.getStringCollection( + MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE)); + archives.addAll(conf + .getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE)); + + Map statCache = new HashMap(); checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache); - uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication); - uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication); - uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication); - uploadJobJar(job, jobJar, submitJobDir, replication); + Map fileSCUploadPolicies = + new LinkedHashMap(); + Map archiveSCUploadPolicies = + new LinkedHashMap(); + + uploadFiles(job, files, submitJobDir, mapredSysPerms, replication, + fileSCUploadPolicies, statCache); + uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication, + fileSCUploadPolicies, statCache); + uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication, + archiveSCUploadPolicies, statCache); + uploadJobJar(job, jobJar, submitJobDir, replication, statCache); addLog4jToDistributedCache(job, submitJobDir); + // Note, we do not consider resources in the distributed cache for the + // shared cache at this time. Only resources specified via the + // GenericOptionsParser or the jobjar. + Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies); + Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies); + // set the timestamps of the archives and files // set the public/private visibility of the archives and files ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf, @@ -125,9 +222,11 @@ class JobResourceUploader { } @VisibleForTesting - void uploadFiles(Configuration conf, Collection files, - Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + void uploadFiles(Job job, Collection files, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication, + Map fileSCUploadPolicies, Map statCache) throws IOException { + Configuration conf = job.getConfiguration(); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); if (!files.isEmpty()) { mkdirs(jtFs, filesDir, mapredSysPerms); @@ -140,17 +239,33 @@ class JobResourceUploader { + " Argument must be a valid URI: " + tmpFile, e); } Path tmp = new Path(tmpURI); - Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication); - try { - URI pathURI = getPathURI(newPath, tmpURI.getFragment()); - DistributedCache.addCacheFile(pathURI, conf); - } catch (URISyntaxException ue) { - // should not throw a uri exception - throw new IOException( - "Failed to create a URI (URISyntaxException) for the remote path " - + newPath + ". This was based on the files parameter: " - + tmpFile, - ue); + URI newURI = null; + boolean uploadToSharedCache = false; + if (scConfig.isSharedCacheFilesEnabled()) { + newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); + if (newURI == null) { + uploadToSharedCache = true; + } + } + + if (newURI == null) { + Path newPath = + copyRemoteFiles(filesDir, tmp, conf, submitReplication); + try { + newURI = getPathURI(newPath, tmpURI.getFragment()); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the" + + " remote path " + newPath + + ". This was based on the files parameter: " + tmpFile, + ue); + } + } + + job.addCacheFile(newURI); + if (scConfig.isSharedCacheFilesEnabled()) { + fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); } } } @@ -159,9 +274,11 @@ class JobResourceUploader { // Suppress warning for use of DistributedCache (it is everywhere). @SuppressWarnings("deprecation") @VisibleForTesting - void uploadLibJars(Configuration conf, Collection libjars, - Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + void uploadLibJars(Job job, Collection libjars, Path submitJobDir, + FsPermission mapredSysPerms, short submitReplication, + Map fileSCUploadPolicies, Map statCache) throws IOException { + Configuration conf = job.getConfiguration(); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); if (!libjars.isEmpty()) { mkdirs(jtFs, libjarsDir, mapredSysPerms); @@ -176,23 +293,53 @@ class JobResourceUploader { + " Argument must be a valid URI: " + tmpjars, e); } Path tmp = new Path(tmpURI); - Path newPath = - copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); - try { - URI pathURI = getPathURI(newPath, tmpURI.getFragment()); - if (!foundFragment) { - foundFragment = pathURI.getFragment() != null; + URI newURI = null; + boolean uploadToSharedCache = false; + boolean fromSharedCache = false; + if (scConfig.isSharedCacheLibjarsEnabled()) { + newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); + if (newURI == null) { + uploadToSharedCache = true; + } else { + fromSharedCache = true; + } + } + + if (newURI == null) { + Path newPath = + copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); + try { + newURI = getPathURI(newPath, tmpURI.getFragment()); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the" + + " remote path " + newPath + + ". This was based on the libjar parameter: " + tmpjars, + ue); } - DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf, - jtFs, false); - libjarURIs.add(pathURI); - } catch (URISyntaxException ue) { - // should not throw a uri exception - throw new IOException( - "Failed to create a URI (URISyntaxException) for the remote path " - + newPath + ". This was based on the libjar parameter: " - + tmpjars, - ue); + } + + if (!foundFragment) { + // We do not count shared cache paths containing fragments as a + // "foundFragment." This is because these resources are not in the + // staging directory and will be added to the distributed cache + // separately. + foundFragment = (newURI.getFragment() != null) && !fromSharedCache; + } + DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf, + jtFs, false); + if (fromSharedCache) { + // We simply add this URI to the distributed cache. It will not come + // from the staging directory (it is in the shared cache), so we + // must add it to the cache regardless of the wildcard feature. + DistributedCache.addCacheFile(newURI, conf); + } else { + libjarURIs.add(newURI); + } + + if (scConfig.isSharedCacheLibjarsEnabled()) { + fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); } } @@ -210,9 +357,11 @@ class JobResourceUploader { } @VisibleForTesting - void uploadArchives(Configuration conf, Collection archives, - Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) - throws IOException { + void uploadArchives(Job job, Collection archives, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication, + Map archiveSCUploadPolicies, + Map statCache) throws IOException { + Configuration conf = job.getConfiguration(); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); if (!archives.isEmpty()) { mkdirs(jtFs, archivesDir, mapredSysPerms); @@ -225,18 +374,34 @@ class JobResourceUploader { + " Argument must be a valid URI: " + tmpArchives, e); } Path tmp = new Path(tmpURI); - Path newPath = - copyRemoteFiles(archivesDir, tmp, conf, submitReplication); - try { - URI pathURI = getPathURI(newPath, tmpURI.getFragment()); - DistributedCache.addCacheArchive(pathURI, conf); - } catch (URISyntaxException ue) { - // should not throw an uri excpetion - throw new IOException( - "Failed to create a URI (URISyntaxException) for the remote path" - + newPath + ". This was based on the archive parameter: " - + tmpArchives, - ue); + URI newURI = null; + boolean uploadToSharedCache = false; + if (scConfig.isSharedCacheArchivesEnabled()) { + newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); + if (newURI == null) { + uploadToSharedCache = true; + } + } + + if (newURI == null) { + Path newPath = + copyRemoteFiles(archivesDir, tmp, conf, submitReplication); + try { + newURI = getPathURI(newPath, tmpURI.getFragment()); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the" + + " remote path " + newPath + + ". This was based on the archive parameter: " + + tmpArchives, + ue); + } + } + + job.addCacheArchive(newURI); + if (scConfig.isSharedCacheArchivesEnabled()) { + archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); } } } @@ -244,7 +409,9 @@ class JobResourceUploader { @VisibleForTesting void uploadJobJar(Job job, String jobJar, Path submitJobDir, - short submitReplication) throws IOException { + short submitReplication, Map statCache) + throws IOException { + Configuration conf = job.getConfiguration(); if (jobJar != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())) { @@ -252,12 +419,59 @@ class JobResourceUploader { } Path jobJarPath = new Path(jobJar); URI jobJarURI = jobJarPath.toUri(); - // If the job jar is already in a global fs, - // we don't need to copy it from local fs - if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) { - copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), - submitReplication); - job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); + Path newJarPath = null; + boolean uploadToSharedCache = false; + if (jobJarURI.getScheme() == null || + jobJarURI.getScheme().equals("file")) { + // job jar is on the local file system + if (scConfig.isSharedCacheJobjarEnabled()) { + // We must have a qualified path for the shared cache client. We can + // assume this is for the local filesystem + jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath); + // Don't add a resource name here because the resource name (i.e. + // job.jar directory symlink) will always be hard coded to job.jar for + // the job.jar + URI newURI = + useSharedCache(jobJarPath.toUri(), null, statCache, conf, false); + if (newURI == null) { + uploadToSharedCache = true; + } else { + newJarPath = stringToPath(newURI.toString()); + // The job jar is coming from the shared cache (i.e. a public + // place), so we want the job.jar to have a public visibility. + conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true); + } + } + if (newJarPath == null) { + newJarPath = JobSubmissionFiles.getJobJar(submitJobDir); + copyJar(jobJarPath, newJarPath, submitReplication); + } + } else { + // job jar is in a remote file system + if (scConfig.isSharedCacheJobjarEnabled()) { + // Don't add a resource name here because the resource name (i.e. + // job.jar directory symlink) will always be hard coded to job.jar for + // the job.jar + URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false); + if (newURI == null) { + uploadToSharedCache = true; + newJarPath = jobJarPath; + } else { + newJarPath = stringToPath(newURI.toString()); + // The job jar is coming from the shared cache (i.e. a public + // place), so we want the job.jar to have a public visibility. + conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true); + } + } else { + // we don't need to upload the jobjar to the staging directory because + // it is already in an accessible place + newJarPath = jobJarPath; + } + } + job.setJar(newJarPath.toString()); + if (scConfig.isSharedCacheJobjarEnabled()) { + conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY, + uploadToSharedCache); } } else { LOG.warn("No job jar file set. User classes may not be found. " @@ -267,7 +481,9 @@ class JobResourceUploader { /** * Verify that the resources this job is going to localize are within the - * localization limits. + * localization limits. We count all resources towards these limits regardless + * of where they are coming from (i.e. local, distributed cache, or shared + * cache). */ @VisibleForTesting void checkLocalizationLimits(Configuration conf, Collection files, @@ -464,6 +680,80 @@ class JobResourceUploader { return newPath; } + /** + * Checksum a local resource file and call use for that resource with the scm. + */ + private URI useSharedCache(URI sourceFile, String resourceName, + Map statCache, Configuration conf, boolean honorFragment) + throws IOException { + if (scClient == null) { + return null; + } + Path filePath = new Path(sourceFile); + if (getFileStatus(statCache, conf, filePath).isDirectory()) { + LOG.warn("Shared cache does not support directories" + + " (see YARN-6097)." + " Will not upload " + filePath + + " to the shared cache."); + return null; + } + + String rn = resourceName; + if (honorFragment) { + if (sourceFile.getFragment() != null) { + rn = sourceFile.getFragment(); + } + } + + // If for whatever reason, we can't even calculate checksum for + // a resource, something is really wrong with the file system; + // even non-SCM approach won't work. Let us just throw the exception. + String checksum = scClient.getFileChecksum(filePath); + URL url = null; + try { + url = scClient.use(this.appId, checksum); + } catch (YarnException e) { + LOG.warn("Error trying to contact the shared cache manager," + + " disabling the SCMClient for the rest of this job submission", e); + /* + * If we fail to contact the SCM, we do not use it for the rest of this + * JobResourceUploader's life. This prevents us from having to timeout + * each time we try to upload a file while the SCM is unavailable. Instead + * we timeout/error the first time and quickly revert to the default + * behavior without the shared cache. We do this by stopping the shared + * cache client and setting it to null. + */ + stopSharedCache(); + } + + if (url != null) { + // Because we deal with URI's in mapreduce, we need to convert the URL to + // a URI and add a fragment if necessary. + URI uri = null; + try { + String name = new Path(url.getFile()).getName(); + if (rn != null && !name.equals(rn)) { + // A name was specified that is different then the URL in the shared + // cache. Therefore, we need to set the fragment portion of the URI to + // preserve the user's desired name. We assume that there is no + // existing fragment in the URL since the shared cache manager does + // not use fragments. + uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(), + url.getPort(), url.getFile(), null, rn); + } else { + uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(), + url.getPort(), url.getFile(), null, null); + } + return uri; + } catch (URISyntaxException e) { + LOG.warn("Error trying to convert URL received from shared cache to" + + " a URI: " + url.toString()); + return null; + } + } else { + return null; + } + } + @VisibleForTesting void copyJar(Path originalJarPath, Path submitJarFile, short replication) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index cf59730..91541eb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -194,6 +194,77 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; /** + * This parameter controls the visibility of the localized job jar on the node + * manager. If set to true, the visibility will be set to + * LocalResourceVisibility.PUBLIC. If set to false, the visibility will be set + * to LocalResourceVisibility.APPLICATION. This is a generated parameter and + * should not be set manually via config files. + */ + String JOBJAR_VISIBILITY = "mapreduce.job.jobjar.visibility"; + boolean JOBJAR_VISIBILITY_DEFAULT = false; + + /** + * This is a generated parameter and should not be set manually via config + * files. + */ + String JOBJAR_SHARED_CACHE_UPLOAD_POLICY = + "mapreduce.job.jobjar.sharedcache.uploadpolicy"; + boolean JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT = false; + + /** + * This is a generated parameter and should not be set manually via config + * files. + */ + String CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES = + "mapreduce.job.cache.files.sharedcache.uploadpolicies"; + + /** + * This is a generated parameter and should not be set manually via config + * files. + */ + String CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES = + "mapreduce.job.cache.archives.sharedcache.uploadpolicies"; + + /** + * A comma delimited list of file resources that are needed for this MapReduce + * job. These resources, if the files resource type is enabled, should either + * use the shared cache or be added to the shared cache. This parameter can be + * modified programmatically using the MapReduce Job api. + */ + String FILES_FOR_SHARED_CACHE = "mapreduce.job.cache.sharedcache.files"; + + /** + * A comma delimited list of libjar resources that are needed for this + * MapReduce job. These resources, if the libjars resource type is enabled, + * should either use the shared cache or be added to the shared cache. These + * resources will also be added to the classpath of all tasks for this + * MapReduce job. This parameter can be modified programmatically using the + * MapReduce Job api. + */ + String FILES_FOR_CLASSPATH_AND_SHARED_CACHE = + "mapreduce.job.cache.sharedcache.files.addtoclasspath"; + + /** + * A comma delimited list of archive resources that are needed for this + * MapReduce job. These resources, if the archives resource type is enabled, + * should either use the shared cache or be added to the shared cache. This + * parameter can be modified programmatically using the MapReduce Job api. + */ + String ARCHIVES_FOR_SHARED_CACHE = + "mapreduce.job.cache.sharedcache.archives"; + + /** + * A comma delimited list of resource categories that are enabled for the + * shared cache. If a category is enabled, resources in that category will be + * uploaded to the shared cache. The valid categories are: jobjar, libjars, + * files, archives. If "disabled" is specified then all categories are + * disabled. If "enabled" is specified then all categories are enabled. + */ + String SHARED_CACHE_MODE = "mapreduce.job.sharedcache.mode"; + + String SHARED_CACHE_MODE_DEFAULT = "disabled"; + + /** * @deprecated Symlinks are always on and cannot be disabled. */ @Deprecated http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java new file mode 100644 index 0000000..de033e5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A class for parsing configuration parameters associated with the shared + * cache. + */ +@Private +@Unstable +public class SharedCacheConfig { + protected static final Log LOG = LogFactory.getLog(SharedCacheConfig.class); + + private boolean sharedCacheFilesEnabled = false; + private boolean sharedCacheLibjarsEnabled = false; + private boolean sharedCacheArchivesEnabled = false; + private boolean sharedCacheJobjarEnabled = false; + + public void init(Configuration conf) { + if (!MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get( + MRConfig.FRAMEWORK_NAME))) { + // Shared cache is only valid if the job runs on yarn + return; + } + + if(!conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, + YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED)) { + return; + } + + + Collection configs = StringUtils.getTrimmedStringCollection( + conf.get(MRJobConfig.SHARED_CACHE_MODE, + MRJobConfig.SHARED_CACHE_MODE_DEFAULT)); + if (configs.contains("files")) { + this.sharedCacheFilesEnabled = true; + } + if (configs.contains("libjars")) { + this.sharedCacheLibjarsEnabled = true; + } + if (configs.contains("archives")) { + this.sharedCacheArchivesEnabled = true; + } + if (configs.contains("jobjar")) { + this.sharedCacheJobjarEnabled = true; + } + if (configs.contains("enabled")) { + this.sharedCacheFilesEnabled = true; + this.sharedCacheLibjarsEnabled = true; + this.sharedCacheArchivesEnabled = true; + this.sharedCacheJobjarEnabled = true; + } + if (configs.contains("disabled")) { + this.sharedCacheFilesEnabled = false; + this.sharedCacheLibjarsEnabled = false; + this.sharedCacheArchivesEnabled = false; + this.sharedCacheJobjarEnabled = false; + } + } + + public boolean isSharedCacheFilesEnabled() { + return sharedCacheFilesEnabled; + } + public boolean isSharedCacheLibjarsEnabled() { + return sharedCacheLibjarsEnabled; + } + public boolean isSharedCacheArchivesEnabled() { + return sharedCacheArchivesEnabled; + } + public boolean isSharedCacheJobjarEnabled() { + return sharedCacheJobjarEnabled; + } + public boolean isSharedCacheEnabled() { + return (sharedCacheFilesEnabled || sharedCacheLibjarsEnabled || + sharedCacheArchivesEnabled || sharedCacheJobjarEnabled); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6b6faf2..9d166c7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -649,6 +649,17 @@ + mapreduce.job.sharedcache.mode + disabled + + A comma delimited list of resource categories to submit to the shared cache. + The valid categories are: jobjar, libjars, files, archives. + If "disabled" is specified then the job submission code will not use + the shared cache. + + + + mapreduce.input.fileinputformat.split.minsize 0 The minimum size chunk that map input should be split http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md new file mode 100644 index 0000000..9e3987c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md @@ -0,0 +1,100 @@ + + +MR Support for YARN Shared Cache +================== + + + +Overview +------- + +MapReduce support for the YARN shared cache allows MapReduce jobs to take advantage +of additional resource caching. This saves network bandwidth between the job +submission client as well as within the YARN cluster itself. This will reduce job +submission time and overall job runtime. + + +Enabling/Disabling the shared cache +------- + +First, your YARN cluster must have the shared cache service running. Please see YARN documentation +for information on how to setup the shared cache service. + +A MapReduce user can specify what resources are eligible to be uploaded to the shared cache +based on resource type. This is done using a configuration parameter in mapred-site.xml: + +``` + + mapreduce.job.sharedcache.mode + disabled + + A comma delimited list of resource categories to submit to the + shared cache. The valid categories are: jobjar, libjars, files, + archives. If "disabled" is specified then the job submission code + will not use the shared cache. + + +``` + +If a resource type is listed, it will check the shared cache to see if the resource is already in the +cache. If so, it will use the cached resource, if not, it will specify that the resource needs to be +uploaded asynchronously. + +Specifying resources for the cache +------- + +A MapReduce user has 3 ways to specify resources for a MapReduce job: + +1. **The command line via the generic options parser (i.e. -files, -archives, -libjars):** If a +resource is specified via the command line and the resource type is enabled for the +shared cache, that resource will use the shared cache. +2. **The distributed cache api:** If a resource is specified via the distributed cache the +resource will not use the shared cache regardless of if the resource type is enabled for +the shared cache. +3. **The shared cache api:** This is a new set of methods added to the +org.apache.hadoop.mapreduce.Job api. It allows users to add a file to the shared cache, +add it to the shared cache and the classpath and add an archive to the shared cache. +These resources will be placed in the distributed cache and, if their resource type is +enabled the client will use the shared cache as well. + +Resource naming +------- + +It is important to ensure that each resource for a MapReduce job has a unique file name. +This prevents symlink clobbering when YARN containers running MapReduce tasks are localized +during container launch. A user can specify their own resource name by using the fragment +portion of a URI. For example, for file resources specified on the command line, it could look +like this: +``` +-files /local/path/file1.txt#foo.txt,/local/path2/file1.txt#bar.txt +``` +In the above example two files, named file1.txt, will be localized with two different names: foo.txt +and bar.txt. + +Resource Visibility +------- + +All resources in the shared cache have a PUBLIC visibility. + + +MapReduce client behavior while the shared cache is unavailable +------- + +In the event that the shared cache manager is unavailable, the MapReduce client uses a fail-fast +mechanism. If the MapReduce client fails to contact the shared cache manager, the client will +no longer use the shared cache for the rest of that job submission. This +prevents the MapReduce client from timing out each time it tries to check for a resource +in the shared cache. The MapReduce client quickly reverts to the default behavior and submits a +Job as if the shared cache was never enabled in the first place. \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org