hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctre...@apache.org
Subject [2/2] hadoop git commit: MAPREDUCE-5951. Add support for the YARN Shared Cache.
Date Thu, 12 Oct 2017 18:16:18 GMT
MAPREDUCE-5951. Add support for the YARN Shared Cache.

(cherry picked from commit e46d5bb962b0c942f993afc505b165b1cd96e51b)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24b03eb7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24b03eb7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24b03eb7

Branch: refs/heads/branch-3.0
Commit: 24b03eb79f4e40824d7fe971b2b57c91fc425c92
Parents: 97b7d39
Author: Chris Trezzo <ctrezzo@apache.org>
Authored: Thu Oct 12 10:58:02 2017 -0700
Committer: Chris Trezzo <ctrezzo@apache.org>
Committed: Thu Oct 12 11:15:46 2017 -0700

----------------------------------------------------------------------
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  16 +
 .../v2/app/job/impl/TaskAttemptImpl.java        |  52 ++-
 .../mapreduce/v2/util/LocalResourceBuilder.java | 169 ++++++++
 .../apache/hadoop/mapreduce/v2/util/MRApps.java | 137 ++----
 .../TestLocalDistributedCacheManager.java       |   9 +
 .../hadoop/mapreduce/v2/util/TestMRApps.java    |   8 +-
 .../hadoop-mapreduce-client-core/pom.xml        |   6 +
 .../java/org/apache/hadoop/mapreduce/Job.java   | 226 ++++++++++
 .../hadoop/mapreduce/JobResourceUploader.java   | 416 ++++++++++++++++---
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  71 ++++
 .../hadoop/mapreduce/SharedCacheConfig.java     | 102 +++++
 .../src/main/resources/mapred-default.xml       |  11 +
 .../src/site/markdown/SharedCacheSupport.md     | 100 +++++
 .../mapreduce/TestJobResourceUploader.java      |  76 ++--
 .../TestJobResourceUploaderWithSharedCache.java | 365 ++++++++++++++++
 .../org/apache/hadoop/mapred/YARNRunner.java    |  54 ++-
 .../hadoop/mapred/TestLocalJobSubmission.java   |  52 +++
 .../apache/hadoop/mapreduce/v2/TestMRJobs.java  |  59 +++
 hadoop-project/src/site/site.xml                |   1 +
 19 files changed, 1701 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 757c545..d2e2492 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -1414,6 +1415,19 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         new char[] {'"', '=', '.'});
   }
 
+  /*
+   * The goal is to make sure only the NM that hosts MRAppMaster will upload
+   * resources to shared cache. Clean up the shared cache policies for all
+   * resources so that later when TaskAttemptImpl creates
+   * ContainerLaunchContext, LocalResource.setShouldBeUploadedToSharedCache will
+   * be set up to false. In that way, the NMs that host the task containers
+   * won't try to upload the resources to shared cache.
+   */
+  private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+    Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
+    Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
+  }
+
   public static class InitTransition 
       implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
@@ -1492,6 +1506,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         job.allowedReduceFailuresPercent =
             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
 
+        cleanupSharedCacheUploadPolicies(job.conf);
+
         // create the Tasks but don't start them yet
         createMapTasks(job, inputLength, taskSplitMetaInfo);
         createReduceTasks(job);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 650f387..00c7b84 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -708,17 +710,38 @@ public abstract class TaskAttemptImpl implements
 
   /**
    * Create a {@link LocalResource} record with all the given parameters.
+   * The NM that hosts AM container will upload resources to shared cache.
+   * Thus there is no need to ask task container's NM to upload the
+   * resources to shared cache. Set the shared cache upload policy to
+   * false.
    */
   private static LocalResource createLocalResource(FileSystem fc, Path file,
-      LocalResourceType type, LocalResourceVisibility visibility)
-      throws IOException {
+      String fileSymlink, LocalResourceType type,
+      LocalResourceVisibility visibility) throws IOException {
     FileStatus fstat = fc.getFileStatus(file);
-    URL resourceURL = URL.fromPath(fc.resolvePath(fstat.getPath()));
+    // We need to be careful when converting from path to URL to add a fragment
+    // so that the symlink name when localized will be correct.
+    Path qualifiedPath = fc.resolvePath(fstat.getPath());
+    URI uriWithFragment = null;
+    boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
+    try {
+      if (useFragment) {
+        uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
+      } else {
+        uriWithFragment = qualifiedPath.toUri();
+      }
+    } catch (URISyntaxException e) {
+      throw new IOException(
+          "Error parsing local resource path."
+              + " Path was not able to be converted to a URI: " + qualifiedPath,
+          e);
+    }
+    URL resourceURL = URL.fromURI(uriWithFragment);
     long resourceSize = fstat.getLen();
     long resourceModificationTime = fstat.getModificationTime();
 
     return LocalResource.newInstance(resourceURL, type, visibility,
-      resourceSize, resourceModificationTime);
+        resourceSize, resourceModificationTime, false);
   }
 
   /**
@@ -829,8 +852,18 @@ public abstract class TaskAttemptImpl implements
       final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
       Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
           jobJarFs.getWorkingDirectory());
-      LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
-          LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
+      LocalResourceVisibility jobJarViz =
+          conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
+              MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.APPLICATION;
+      // We hard code the job.jar localized symlink in the container directory.
+      // This is because the mapreduce app expects the job.jar to be named
+      // accordingly. Additionally we set the shared cache upload policy to
+      // false. Resources are uploaded by the AM if necessary.
+      LocalResource rc =
+          createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR,
+              LocalResourceType.PATTERN, jobJarViz);
       String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
       rc.setPattern(pattern);
@@ -855,9 +888,12 @@ public abstract class TaskAttemptImpl implements
     Path remoteJobConfPath =
         new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
     FileSystem remoteFS = FileSystem.get(conf);
+    // There is no point to ask task container's NM to upload the resource
+    // to shared cache (job conf is not shared). Therefore, createLocalResource
+    // will set the shared cache upload policy to false
     localResources.put(MRJobConfig.JOB_CONF_FILE,
-        createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE,
-            LocalResourceVisibility.APPLICATION));
+        createLocalResource(remoteFS, remoteJobConfPath, null,
+            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
     LOG.info("The job-conf file on the remote FS is "
         + remoteJobConfPath.toUri().toASCIIString());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java
new file mode 100644
index 0000000..cb55e13
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java
@@ -0,0 +1,169 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+/**
+ * Helper class for MR applications that parses distributed cache artifacts and
+ * creates a map of LocalResources.
+ */
+@SuppressWarnings("deprecation")
+@Private
+@Unstable
+class LocalResourceBuilder {
+  public static final Log LOG = LogFactory.getLog(LocalResourceBuilder.class);
+
+  private Configuration conf;
+  private LocalResourceType type;
+  private URI[] uris;
+  private long[] timestamps;
+  private long[] sizes;
+  private boolean[] visibilities;
+  private Map<String, Boolean> sharedCacheUploadPolicies;
+
+  LocalResourceBuilder() {
+  }
+
+  void setConf(Configuration c) {
+    this.conf = c;
+  }
+
+  void setType(LocalResourceType t) {
+    this.type = t;
+  }
+
+  void setUris(URI[] u) {
+    this.uris = u;
+  }
+
+  void setTimestamps(long[] t) {
+    this.timestamps = t;
+  }
+
+  void setSizes(long[] s) {
+    this.sizes = s;
+  }
+
+  void setVisibilities(boolean[] v) {
+    this.visibilities = v;
+  }
+
+  void setSharedCacheUploadPolicies(Map<String, Boolean> policies) {
+    this.sharedCacheUploadPolicies = policies;
+  }
+
+  void createLocalResources(Map<String, LocalResource> localResources)
+      throws IOException {
+
+    if (uris != null) {
+      // Sanity check
+      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+          (uris.length != visibilities.length)) {
+        throw new IllegalArgumentException("Invalid specification for " +
+            "distributed-cache artifacts of type " + type + " :" +
+            " #uris=" + uris.length +
+            " #timestamps=" + timestamps.length +
+            " #visibilities=" + visibilities.length
+            );
+      }
+
+      for (int i = 0; i < uris.length; ++i) {
+        URI u = uris[i];
+        Path p = new Path(u);
+        FileSystem remoteFS = p.getFileSystem(conf);
+        String linkName = null;
+
+        if (p.getName().equals(DistributedCache.WILDCARD)) {
+          p = p.getParent();
+          linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
+        }
+
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+
+        // If there's no wildcard, try using the fragment for the link
+        if (linkName == null) {
+          linkName = u.getFragment();
+
+          // Because we don't know what's in the fragment, we have to handle
+          // it with care.
+          if (linkName != null) {
+            Path linkPath = new Path(linkName);
+
+            if (linkPath.isAbsolute()) {
+              throw new IllegalArgumentException("Resource name must be "
+                  + "relative");
+            }
+
+            linkName = linkPath.toUri().getPath();
+          }
+        } else if (u.getFragment() != null) {
+          throw new IllegalArgumentException("Invalid path URI: " + p +
+              " - cannot contain both a URI fragment and a wildcard");
+        }
+
+        // If there's no wildcard or fragment, just link to the file name
+        if (linkName == null) {
+          linkName = p.getName();
+        }
+
+        LocalResource orig = localResources.get(linkName);
+        if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
+          throw new InvalidJobConfException(
+              getResourceDescription(orig.getType()) + orig.getResource()
+                  +
+              " conflicts with " + getResourceDescription(type) + u);
+        }
+        Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString());
+        sharedCachePolicy =
+            sharedCachePolicy == null ? Boolean.FALSE : sharedCachePolicy;
+        localResources.put(linkName, LocalResource.newInstance(URL.fromURI(p
+            .toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC
+                : LocalResourceVisibility.PRIVATE,
+            sizes[i], timestamps[i], sharedCachePolicy));
+      }
+    }
+  }
+
+  private static String getResourceDescription(LocalResourceType type) {
+    if (type == LocalResourceType.ARCHIVE
+        || type == LocalResourceType.PATTERN) {
+      return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
+    }
+    return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index a43da65..5777117 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskLog;
@@ -67,12 +67,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * Helper class for MR applications
@@ -251,10 +248,16 @@ public class MRApps extends Apps {
     if (!userClassesTakesPrecedence) {
       MRApps.setMRFrameworkClasspath(environment, conf);
     }
+    /*
+     * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for
+     * the case where the job jar is not necessarily named "job.jar". This can
+     * happen, for example, when the job is leveraging a resource from the YARN
+     * shared cache.
+     */
     MRApps.addToEnvironment(
         environment,
         classpathEnvVar,
-        MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
+        MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf);
     MRApps.addToEnvironment(
         environment,
         classpathEnvVar,
@@ -471,27 +474,32 @@ public class MRApps extends Apps {
     return startCommitFile;
   }
 
-  public static void setupDistributedCache( 
-      Configuration conf, 
-      Map<String, LocalResource> localResources) 
-  throws IOException {
-    
+  @SuppressWarnings("deprecation")
+  public static void setupDistributedCache(Configuration conf,
+      Map<String, LocalResource> localResources) throws IOException {
+
+    LocalResourceBuilder lrb = new LocalResourceBuilder();
+    lrb.setConf(conf);
+
     // Cache archives
-    parseDistributedCacheArtifacts(conf, localResources,  
-        LocalResourceType.ARCHIVE, 
-        DistributedCache.getCacheArchives(conf), 
-        DistributedCache.getArchiveTimestamps(conf),
-        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf));
+    lrb.setType(LocalResourceType.ARCHIVE);
+    lrb.setUris(DistributedCache.getCacheArchives(conf));
+    lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf));
+    lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
+    lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
+    lrb.setSharedCacheUploadPolicies(
+        Job.getArchiveSharedCacheUploadPolicies(conf));
+    lrb.createLocalResources(localResources);
     
     // Cache files
-    parseDistributedCacheArtifacts(conf, 
-        localResources,  
-        LocalResourceType.FILE, 
-        DistributedCache.getCacheFiles(conf),
-        DistributedCache.getFileTimestamps(conf),
-        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf));
+    lrb.setType(LocalResourceType.FILE);
+    lrb.setUris(DistributedCache.getCacheFiles(conf));
+    lrb.setTimestamps(DistributedCache.getFileTimestamps(conf));
+    lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
+    lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
+    lrb.setSharedCacheUploadPolicies(
+        Job.getFileSharedCacheUploadPolicies(conf));
+    lrb.createLocalResources(localResources);
   }
 
   /**
@@ -550,89 +558,6 @@ public class MRApps extends Apps {
     }
   }
 
-  private static String getResourceDescription(LocalResourceType type) {
-    if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
-      return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
-    }
-    return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
-  }
-  
-  // TODO - Move this to MR!
-  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
-  // long[], boolean[], Path[], FileType)
-  private static void parseDistributedCacheArtifacts(
-      Configuration conf,
-      Map<String, LocalResource> localResources,
-      LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
-  throws IOException {
-
-    if (uris != null) {
-      // Sanity check
-      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
-          (uris.length != visibilities.length)) {
-        throw new IllegalArgumentException("Invalid specification for " +
-            "distributed-cache artifacts of type " + type + " :" +
-            " #uris=" + uris.length +
-            " #timestamps=" + timestamps.length +
-            " #visibilities=" + visibilities.length
-            );
-      }
-      
-      for (int i = 0; i < uris.length; ++i) {
-        URI u = uris[i];
-        Path p = new Path(u);
-        FileSystem remoteFS = p.getFileSystem(conf);
-        String linkName = null;
-
-        if (p.getName().equals(DistributedCache.WILDCARD)) {
-          p = p.getParent();
-          linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
-        }
-
-        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-            remoteFS.getWorkingDirectory()));
-
-        // If there's no wildcard, try using the fragment for the link
-        if (linkName == null) {
-          linkName = u.getFragment();
-
-          // Because we don't know what's in the fragment, we have to handle
-          // it with care.
-          if (linkName != null) {
-            Path linkPath = new Path(linkName);
-
-            if (linkPath.isAbsolute()) {
-              throw new IllegalArgumentException("Resource name must be "
-                  + "relative");
-            }
-
-            linkName = linkPath.toUri().getPath();
-          }
-        } else if (u.getFragment() != null) {
-          throw new IllegalArgumentException("Invalid path URI: " + p +
-              " - cannot contain both a URI fragment and a wildcard");
-        }
-
-        // If there's no wildcard or fragment, just link to the file name
-        if (linkName == null) {
-          linkName = p.getName();
-        }
-
-        LocalResource orig = localResources.get(linkName);
-        if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
-          throw new InvalidJobConfException(
-              getResourceDescription(orig.getType()) + orig.getResource() + 
-              " conflicts with " + getResourceDescription(type) + u);
-        }
-        localResources.put(linkName, LocalResource
-            .newInstance(URL.fromURI(p.toUri()), type, visibilities[i]
-            ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
-          sizes[i], timestamps[i]));
-      }
-    }
-  }
-  
   // TODO - Move this to MR!
   private static long[] getFileSizes(Configuration conf, String key) {
     String[] strs = conf.getStrings(key);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
index ec80e65..d2814e9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
@@ -30,6 +30,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -39,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -164,6 +167,9 @@ public class TestLocalDistributedCacheManager {
     });
 
     DistributedCache.addCacheFile(file, conf);
+    Map<String, Boolean> policies = new HashMap<String, Boolean>();
+    policies.put(file.toString(), true);
+    Job.setFileSharedCacheUploadPolicies(conf, policies);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
@@ -272,6 +278,9 @@ public class TestLocalDistributedCacheManager {
 
     DistributedCache.addCacheFile(file, conf);
     DistributedCache.addCacheFile(file, conf);
+    Map<String, Boolean> policies = new HashMap<String, Boolean>();
+    policies.put(file.toString(), true);
+    Job.setFileSharedCacheUploadPolicies(conf, policies);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
index 96b4e84..a6936e6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
@@ -260,7 +260,7 @@ public class TestMRApps {
     }
     String env_str = env.get("CLASSPATH");
     String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-      Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
+        Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
         "job.jar/classes/", "job.jar/lib/*",
         ApplicationConstants.Environment.PWD.$$() + "/*"));
     assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
@@ -280,7 +280,7 @@ public class TestMRApps {
     }
     String env_str = env.get("CLASSPATH");
     String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-      Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+        Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
         ApplicationConstants.Environment.PWD.$$() + "/*"));
     assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
       + " the classpath!", env_str.contains(expectedClasspath));
@@ -302,7 +302,7 @@ public class TestMRApps {
     assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
       cp.contains("PWD"));
     String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-      Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
+        Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
         "job.jar/classes/", "job.jar/lib/*",
         ApplicationConstants.Environment.PWD.$$() + "/*"));
     assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
@@ -331,7 +331,7 @@ public class TestMRApps {
     conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH);
     MRApps.setClasspath(env, conf);
     final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-        Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+        Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
             ApplicationConstants.Environment.PWD.$$() + "/*"));
     String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
         Arrays.asList(ApplicationConstants.Environment.PWD.$$(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
index 5b55d47..824c25e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
@@ -52,6 +52,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.skyscreamer</groupId>
       <artifactId>jsonassert</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 5530d95..a09f034 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -21,12 +21,17 @@ package org.apache.hadoop.mapreduce;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.FileSystem;
@@ -1303,6 +1308,227 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
     }   
   }
 
+  /**
+   * Add a file to job config for shared cache processing. If shared cache is
+   * enabled, it will return true, otherwise, return false. We don't check with
+   * SCM here given application might not be able to provide the job id;
+   * ClientSCMProtocol.use requires the application id. Job Submitter will read
+   * the files from job config and take care of things.
+   *
+   * @param resource The resource that Job Submitter will process later using
+   *          shared cache.
+   * @param conf Configuration to add the resource to
+   * @return whether the resource has been added to the configuration
+   */
+  @Unstable
+  public static boolean addFileToSharedCache(URI resource, Configuration conf) {
+    SharedCacheConfig scConfig = new SharedCacheConfig();
+    scConfig.init(conf);
+    if (scConfig.isSharedCacheFilesEnabled()) {
+      String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE);
+      conf.set(
+          MRJobConfig.FILES_FOR_SHARED_CACHE,
+          files == null ? resource.toString() : files + ","
+              + resource.toString());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Add a file to job config for shared cache processing. If shared cache is
+   * enabled, it will return true, otherwise, return false. We don't check with
+   * SCM here given application might not be able to provide the job id;
+   * ClientSCMProtocol.use requires the application id. Job Submitter will read
+   * the files from job config and take care of things. Job Submitter will also
+   * add the file to classpath. Intended to be used by user code.
+   *
+   * @param resource The resource that Job Submitter will process later using
+   *          shared cache.
+   * @param conf Configuration to add the resource to
+   * @return whether the resource has been added to the configuration
+   */
+  @Unstable
+  public static boolean addFileToSharedCacheAndClasspath(URI resource,
+      Configuration conf) {
+    SharedCacheConfig scConfig = new SharedCacheConfig();
+    scConfig.init(conf);
+    if (scConfig.isSharedCacheLibjarsEnabled()) {
+      String files =
+          conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE);
+      conf.set(
+          MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE,
+          files == null ? resource.toString() : files + ","
+              + resource.toString());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Add an archive to job config for shared cache processing. If shared cache
+   * is enabled, it will return true, otherwise, return false. We don't check
+   * with SCM here given application might not be able to provide the job id;
+   * ClientSCMProtocol.use requires the application id. Job Submitter will read
+   * the files from job config and take care of things. Intended to be used by
+   * user code.
+   *
+   * @param resource The resource that Job Submitter will process later using
+   *          shared cache.
+   * @param conf Configuration to add the resource to
+   * @return whether the resource has been added to the configuration
+   */
+  @Unstable
+  public static boolean addArchiveToSharedCache(URI resource,
+      Configuration conf) {
+    SharedCacheConfig scConfig = new SharedCacheConfig();
+    scConfig.init(conf);
+    if (scConfig.isSharedCacheArchivesEnabled()) {
+      String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE);
+      conf.set(
+          MRJobConfig.ARCHIVES_FOR_SHARED_CACHE,
+          files == null ? resource.toString() : files + ","
+              + resource.toString());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * This is to set the shared cache upload policies for files. If the parameter
+   * was previously set, this method will replace the old value with the new
+   * provided map.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param policies A map containing the shared cache upload policies for a set
+   *          of resources. The key is the url of the resource and the value is
+   *          the upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static void setFileSharedCacheUploadPolicies(Configuration conf,
+      Map<String, Boolean> policies) {
+    setSharedCacheUploadPolicies(conf, policies, true);
+  }
+
+  /**
+   * This is to set the shared cache upload policies for archives. If the
+   * parameter was previously set, this method will replace the old value with
+   * the new provided map.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param policies A map containing the shared cache upload policies for a set
+   *          of resources. The key is the url of the resource and the value is
+   *          the upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static void setArchiveSharedCacheUploadPolicies(Configuration conf,
+      Map<String, Boolean> policies) {
+    setSharedCacheUploadPolicies(conf, policies, false);
+  }
+
+  // We use a double colon because a colon is a reserved character in a URI and
+  // there should not be two colons next to each other.
+  private static final String DELIM = "::";
+
+  /**
+   * Set the shared cache upload policies config parameter. This is done by
+   * serializing the provided map of shared cache upload policies into a config
+   * parameter. If the parameter was previously set, this method will replace
+   * the old value with the new provided map.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param policies A map containing the shared cache upload policies for a set
+   *          of resources. The key is the url of the resource and the value is
+   *          the upload policy. True if it should be uploaded, false otherwise.
+   * @param areFiles True if these policies are for files, false if they are for
+   *          archives.
+   */
+  private static void setSharedCacheUploadPolicies(Configuration conf,
+      Map<String, Boolean> policies, boolean areFiles) {
+    if (policies != null) {
+      StringBuilder sb = new StringBuilder();
+      Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
+      Map.Entry<String, Boolean> e;
+      if (it.hasNext()) {
+        e = it.next();
+        sb.append(e.getKey() + DELIM + e.getValue());
+      } else {
+        // policies is an empty map, just skip setting the parameter
+        return;
+      }
+      while (it.hasNext()) {
+        e = it.next();
+        sb.append("," + e.getKey() + DELIM + e.getValue());
+      }
+      String confParam =
+          areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
+              : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+      conf.set(confParam, sb.toString());
+    }
+  }
+
+  /**
+   * Deserialize a map of shared cache upload policies from a config parameter.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param areFiles True if these policies are for files, false if they are for
+   *          archives.
+   * @return A map containing the shared cache upload policies for a set of
+   *         resources. The key is the url of the resource and the value is the
+   *         upload policy. True if it should be uploaded, false otherwise.
+   */
+  private static Map<String, Boolean> getSharedCacheUploadPolicies(
+      Configuration conf, boolean areFiles) {
+    String confParam =
+        areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
+            : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+    Collection<String> policies = conf.getStringCollection(confParam);
+    String[] policy;
+    Map<String, Boolean> policyMap = new LinkedHashMap<String, Boolean>();
+    for (String s : policies) {
+      policy = s.split(DELIM);
+      if (policy.length != 2) {
+        LOG.error(confParam
+            + " is mis-formatted, returning empty shared cache upload policies."
+            + " Error on [" + s + "]");
+        return new LinkedHashMap<String, Boolean>();
+      }
+      policyMap.put(policy[0], Boolean.parseBoolean(policy[1]));
+    }
+    return policyMap;
+  }
+
+  /**
+   * This is to get the shared cache upload policies for files.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @return A map containing the shared cache upload policies for a set of
+   *         resources. The key is the url of the resource and the value is the
+   *         upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static Map<String, Boolean> getFileSharedCacheUploadPolicies(
+      Configuration conf) {
+    return getSharedCacheUploadPolicies(conf, true);
+  }
+
+  /**
+   * This is to get the shared cache upload policies for archives.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @return A map containing the shared cache upload policies for a set of
+   *         resources. The key is the url of the resource and the value is the
+   *         upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static Map<String, Boolean> getArchiveSharedCacheUploadPolicies(
+      Configuration conf) {
+    return getSharedCacheUploadPolicies(conf, false);
+  }
+
   private synchronized void connect()
           throws IOException, InterruptedException, ClassNotFoundException {
     if (cluster == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index d9bf988..a044fc1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -24,12 +24,13 @@ import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,30 +41,100 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.client.api.SharedCacheClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.annotations.VisibleForTesting;
 
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
+/**
+ * This class is responsible for uploading resources from the client to HDFS
+ * that are associated with a MapReduce job.
+ */
+@Private
+@Unstable
 class JobResourceUploader {
   protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
   private final boolean useWildcard;
   private final FileSystem jtFs;
+  private SharedCacheClient scClient = null;
+  private SharedCacheConfig scConfig = new SharedCacheConfig();
+  private ApplicationId appId = null;
 
   JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
     this.jtFs = submitFs;
     this.useWildcard = useWildcard;
   }
 
+  private void initSharedCache(JobID jobid, Configuration conf) {
+    this.scConfig.init(conf);
+    if (this.scConfig.isSharedCacheEnabled()) {
+      this.scClient = createSharedCacheClient(conf);
+      appId = jobIDToAppId(jobid);
+    }
+  }
+
+  /*
+   * We added this method so that we could do the conversion between JobId and
+   * ApplicationId for the shared cache client. This logic is very similar to
+   * the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use
+   * that because mapreduce-client-core can not depend on
+   * mapreduce-client-common.
+   */
+  private ApplicationId jobIDToAppId(JobID jobId) {
+    return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()),
+        jobId.getId());
+  }
+
+  private void stopSharedCache() {
+    if (scClient != null) {
+      scClient.stop();
+      scClient = null;
+    }
+  }
+
+  /**
+   * Create, initialize and start a new shared cache client.
+   */
+  @VisibleForTesting
+  protected SharedCacheClient createSharedCacheClient(Configuration conf) {
+    SharedCacheClient scc = SharedCacheClient.createSharedCacheClient();
+    scc.init(conf);
+    scc.start();
+    return scc;
+  }
+
   /**
    * Upload and configure files, libjars, jobjars, and archives pertaining to
    * the passed job.
-   * 
+   * <p>
+   * This client will use the shared cache for libjars, files, archives and
+   * jobjars if it is enabled. When shared cache is enabled, it will try to use
+   * the shared cache and fall back to the default behavior when the scm isn't
+   * available.
+   * <p>
+   * 1. For the resources that have been successfully shared, we will continue
+   * to use them in a shared fashion.
+   * <p>
+   * 2. For the resources that weren't in the cache and need to be uploaded by
+   * NM, we won't ask NM to upload them.
+   *
    * @param job the job containing the files to be uploaded
    * @param submitJobDir the submission directory of the job
    * @throws IOException
    */
   public void uploadResources(Job job, Path submitJobDir) throws IOException {
+    try {
+      initSharedCache(job.getJobID(), job.getConfiguration());
+      uploadResourcesInternal(job, submitJobDir);
+    } finally {
+      stopSharedCache();
+    }
+  }
+
+  private void uploadResourcesInternal(Job job, Path submitJobDir)
+      throws IOException {
     Configuration conf = job.getConfiguration();
     short replication =
         (short) conf.getInt(Job.SUBMIT_REPLICATION,
@@ -90,6 +161,7 @@ class JobResourceUploader {
           + " already exists!! This is unexpected.Please check what's there in"
           + " that directory");
     }
+    // Create the submission directory for the MapReduce job.
     submitJobDir = jtFs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
     FsPermission mapredSysPerms =
@@ -101,20 +173,45 @@ class JobResourceUploader {
       disableErasureCodingForPath(jtFs, submitJobDir);
     }
 
+    // Get the resources that have been added via command line arguments in the
+    // GenericOptionsParser (i.e. files, libjars, archives).
     Collection<String> files = conf.getStringCollection("tmpfiles");
     Collection<String> libjars = conf.getStringCollection("tmpjars");
     Collection<String> archives = conf.getStringCollection("tmparchives");
     String jobJar = job.getJar();
 
+    // Merge resources that have been programmatically specified for the shared
+    // cache via the Job API.
+    files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE));
+    libjars.addAll(conf.getStringCollection(
+            MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE));
+    archives.addAll(conf
+        .getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE));
+
+
     Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
     checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);
 
-    uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication);
-    uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication);
-    uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication);
-    uploadJobJar(job, jobJar, submitJobDir, replication);
+    Map<String, Boolean> fileSCUploadPolicies =
+        new LinkedHashMap<String, Boolean>();
+    Map<String, Boolean> archiveSCUploadPolicies =
+        new LinkedHashMap<String, Boolean>();
+
+    uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
+        fileSCUploadPolicies, statCache);
+    uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
+        fileSCUploadPolicies, statCache);
+    uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
+        archiveSCUploadPolicies, statCache);
+    uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
     addLog4jToDistributedCache(job, submitJobDir);
 
+    // Note, we do not consider resources in the distributed cache for the
+    // shared cache at this time. Only resources specified via the
+    // GenericOptionsParser or the jobjar.
+    Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies);
+    Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies);
+
     // set the timestamps of the archives and files
     // set the public/private visibility of the archives and files
     ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
@@ -125,9 +222,11 @@ class JobResourceUploader {
   }
 
   @VisibleForTesting
-  void uploadFiles(Configuration conf, Collection<String> files,
-      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+  void uploadFiles(Job job, Collection<String> files,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
+      Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
       throws IOException {
+    Configuration conf = job.getConfiguration();
     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
     if (!files.isEmpty()) {
       mkdirs(jtFs, filesDir, mapredSysPerms);
@@ -140,17 +239,33 @@ class JobResourceUploader {
               + " Argument must be a valid URI: " + tmpFile, e);
         }
         Path tmp = new Path(tmpURI);
-        Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          DistributedCache.addCacheFile(pathURI, conf);
-        } catch (URISyntaxException ue) {
-          // should not throw a uri exception
-          throw new IOException(
-              "Failed to create a URI (URISyntaxException) for the remote path "
-                  + newPath + ". This was based on the files parameter: "
-                  + tmpFile,
-              ue);
+        URI newURI = null;
+        boolean uploadToSharedCache = false;
+        if (scConfig.isSharedCacheFilesEnabled()) {
+          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          }
+        }
+
+        if (newURI == null) {
+          Path newPath =
+              copyRemoteFiles(filesDir, tmp, conf, submitReplication);
+          try {
+            newURI = getPathURI(newPath, tmpURI.getFragment());
+          } catch (URISyntaxException ue) {
+            // should not throw a uri exception
+            throw new IOException(
+                "Failed to create a URI (URISyntaxException) for the"
+                    + " remote path " + newPath
+                    + ". This was based on the files parameter: " + tmpFile,
+                ue);
+          }
+        }
+
+        job.addCacheFile(newURI);
+        if (scConfig.isSharedCacheFilesEnabled()) {
+          fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
         }
       }
     }
@@ -159,9 +274,11 @@ class JobResourceUploader {
   // Suppress warning for use of DistributedCache (it is everywhere).
   @SuppressWarnings("deprecation")
   @VisibleForTesting
-  void uploadLibJars(Configuration conf, Collection<String> libjars,
-      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+  void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
+      FsPermission mapredSysPerms, short submitReplication,
+      Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
       throws IOException {
+    Configuration conf = job.getConfiguration();
     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     if (!libjars.isEmpty()) {
       mkdirs(jtFs, libjarsDir, mapredSysPerms);
@@ -176,23 +293,53 @@ class JobResourceUploader {
               + " Argument must be a valid URI: " + tmpjars, e);
         }
         Path tmp = new Path(tmpURI);
-        Path newPath =
-            copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          if (!foundFragment) {
-            foundFragment = pathURI.getFragment() != null;
+        URI newURI = null;
+        boolean uploadToSharedCache = false;
+        boolean fromSharedCache = false;
+        if (scConfig.isSharedCacheLibjarsEnabled()) {
+          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          } else {
+            fromSharedCache = true;
+          }
+        }
+
+        if (newURI == null) {
+          Path newPath =
+              copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
+          try {
+            newURI = getPathURI(newPath, tmpURI.getFragment());
+          } catch (URISyntaxException ue) {
+            // should not throw a uri exception
+            throw new IOException(
+                "Failed to create a URI (URISyntaxException) for the"
+                    + " remote path " + newPath
+                    + ". This was based on the libjar parameter: " + tmpjars,
+                ue);
           }
-          DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf,
-              jtFs, false);
-          libjarURIs.add(pathURI);
-        } catch (URISyntaxException ue) {
-          // should not throw a uri exception
-          throw new IOException(
-              "Failed to create a URI (URISyntaxException) for the remote path "
-                  + newPath + ". This was based on the libjar parameter: "
-                  + tmpjars,
-              ue);
+        }
+
+        if (!foundFragment) {
+          // We do not count shared cache paths containing fragments as a
+          // "foundFragment." This is because these resources are not in the
+          // staging directory and will be added to the distributed cache
+          // separately.
+          foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
+        }
+        DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf,
+            jtFs, false);
+        if (fromSharedCache) {
+          // We simply add this URI to the distributed cache. It will not come
+          // from the staging directory (it is in the shared cache), so we
+          // must add it to the cache regardless of the wildcard feature.
+          DistributedCache.addCacheFile(newURI, conf);
+        } else {
+          libjarURIs.add(newURI);
+        }
+
+        if (scConfig.isSharedCacheLibjarsEnabled()) {
+          fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
         }
       }
 
@@ -210,9 +357,11 @@ class JobResourceUploader {
   }
 
   @VisibleForTesting
-  void uploadArchives(Configuration conf, Collection<String> archives,
-      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
-      throws IOException {
+  void uploadArchives(Job job, Collection<String> archives,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
+      Map<String, Boolean> archiveSCUploadPolicies,
+      Map<URI, FileStatus> statCache) throws IOException {
+    Configuration conf = job.getConfiguration();
     Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
     if (!archives.isEmpty()) {
       mkdirs(jtFs, archivesDir, mapredSysPerms);
@@ -225,18 +374,34 @@ class JobResourceUploader {
               + " Argument must be a valid URI: " + tmpArchives, e);
         }
         Path tmp = new Path(tmpURI);
-        Path newPath =
-            copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          DistributedCache.addCacheArchive(pathURI, conf);
-        } catch (URISyntaxException ue) {
-          // should not throw an uri excpetion
-          throw new IOException(
-              "Failed to create a URI (URISyntaxException) for the remote path"
-                  + newPath + ". This was based on the archive parameter: "
-                  + tmpArchives,
-              ue);
+        URI newURI = null;
+        boolean uploadToSharedCache = false;
+        if (scConfig.isSharedCacheArchivesEnabled()) {
+          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          }
+        }
+
+        if (newURI == null) {
+          Path newPath =
+              copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
+          try {
+            newURI = getPathURI(newPath, tmpURI.getFragment());
+          } catch (URISyntaxException ue) {
+            // should not throw a uri exception
+            throw new IOException(
+                "Failed to create a URI (URISyntaxException) for the"
+                    + " remote path " + newPath
+                    + ". This was based on the archive parameter: "
+                    + tmpArchives,
+                ue);
+          }
+        }
+
+        job.addCacheArchive(newURI);
+        if (scConfig.isSharedCacheArchivesEnabled()) {
+          archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
         }
       }
     }
@@ -244,7 +409,9 @@ class JobResourceUploader {
 
   @VisibleForTesting
   void uploadJobJar(Job job, String jobJar, Path submitJobDir,
-      short submitReplication) throws IOException {
+      short submitReplication, Map<URI, FileStatus> statCache)
+      throws IOException {
+    Configuration conf = job.getConfiguration();
     if (jobJar != null) { // copy jar to JobTracker's fs
       // use jar name if job is not named.
       if ("".equals(job.getJobName())) {
@@ -252,12 +419,59 @@ class JobResourceUploader {
       }
       Path jobJarPath = new Path(jobJar);
       URI jobJarURI = jobJarPath.toUri();
-      // If the job jar is already in a global fs,
-      // we don't need to copy it from local fs
-      if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
-        copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
-            submitReplication);
-        job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      Path newJarPath = null;
+      boolean uploadToSharedCache = false;
+      if (jobJarURI.getScheme() == null ||
+          jobJarURI.getScheme().equals("file")) {
+        // job jar is on the local file system
+        if (scConfig.isSharedCacheJobjarEnabled()) {
+          // We must have a qualified path for the shared cache client. We can
+          // assume this is for the local filesystem
+          jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath);
+          // Don't add a resource name here because the resource name (i.e.
+          // job.jar directory symlink) will always be hard coded to job.jar for
+          // the job.jar
+          URI newURI =
+              useSharedCache(jobJarPath.toUri(), null, statCache, conf, false);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          } else {
+            newJarPath = stringToPath(newURI.toString());
+            // The job jar is coming from the shared cache (i.e. a public
+            // place), so we want the job.jar to have a public visibility.
+            conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
+          }
+        }
+        if (newJarPath == null) {
+          newJarPath = JobSubmissionFiles.getJobJar(submitJobDir);
+          copyJar(jobJarPath, newJarPath, submitReplication);
+        }
+      } else {
+        // job jar is in a remote file system
+        if (scConfig.isSharedCacheJobjarEnabled()) {
+          // Don't add a resource name here because the resource name (i.e.
+          // job.jar directory symlink) will always be hard coded to job.jar for
+          // the job.jar
+          URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+            newJarPath = jobJarPath;
+          } else {
+            newJarPath = stringToPath(newURI.toString());
+            // The job jar is coming from the shared cache (i.e. a public
+            // place), so we want the job.jar to have a public visibility.
+            conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
+          }
+        } else {
+          // we don't need to upload the jobjar to the staging directory because
+          // it is already in an accessible place
+          newJarPath = jobJarPath;
+        }
+      }
+      job.setJar(newJarPath.toString());
+      if (scConfig.isSharedCacheJobjarEnabled()) {
+        conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
+            uploadToSharedCache);
       }
     } else {
       LOG.warn("No job jar file set.  User classes may not be found. "
@@ -267,7 +481,9 @@ class JobResourceUploader {
 
   /**
    * Verify that the resources this job is going to localize are within the
-   * localization limits.
+   * localization limits. We count all resources towards these limits regardless
+   * of where they are coming from (i.e. local, distributed cache, or shared
+   * cache).
    */
   @VisibleForTesting
   void checkLocalizationLimits(Configuration conf, Collection<String> files,
@@ -464,6 +680,80 @@ class JobResourceUploader {
     return newPath;
   }
 
+  /**
+   * Checksum a local resource file and call use for that resource with the scm.
+   */
+  private URI useSharedCache(URI sourceFile, String resourceName,
+      Map<URI, FileStatus> statCache, Configuration conf, boolean honorFragment)
+      throws IOException {
+    if (scClient == null) {
+      return null;
+    }
+    Path filePath = new Path(sourceFile);
+    if (getFileStatus(statCache, conf, filePath).isDirectory()) {
+      LOG.warn("Shared cache does not support directories"
+          + " (see YARN-6097)." + " Will not upload " + filePath
+          + " to the shared cache.");
+      return null;
+    }
+
+    String rn = resourceName;
+    if (honorFragment) {
+      if (sourceFile.getFragment() != null) {
+        rn = sourceFile.getFragment();
+      }
+    }
+
+    // If for whatever reason, we can't even calculate checksum for
+    // a resource, something is really wrong with the file system;
+    // even non-SCM approach won't work. Let us just throw the exception.
+    String checksum = scClient.getFileChecksum(filePath);
+    URL url = null;
+    try {
+      url = scClient.use(this.appId, checksum);
+    } catch (YarnException e) {
+      LOG.warn("Error trying to contact the shared cache manager,"
+          + " disabling the SCMClient for the rest of this job submission", e);
+      /*
+       * If we fail to contact the SCM, we do not use it for the rest of this
+       * JobResourceUploader's life. This prevents us from having to timeout
+       * each time we try to upload a file while the SCM is unavailable. Instead
+       * we timeout/error the first time and quickly revert to the default
+       * behavior without the shared cache. We do this by stopping the shared
+       * cache client and setting it to null.
+       */
+      stopSharedCache();
+    }
+
+    if (url != null) {
+      // Because we deal with URI's in mapreduce, we need to convert the URL to
+      // a URI and add a fragment if necessary.
+      URI uri = null;
+      try {
+        String name = new Path(url.getFile()).getName();
+        if (rn != null && !name.equals(rn)) {
+          // A name was specified that is different then the URL in the shared
+          // cache. Therefore, we need to set the fragment portion of the URI to
+          // preserve the user's desired name. We assume that there is no
+          // existing fragment in the URL since the shared cache manager does
+          // not use fragments.
+          uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
+              url.getPort(), url.getFile(), null, rn);
+        } else {
+          uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
+              url.getPort(), url.getFile(), null, null);
+        }
+        return uri;
+      } catch (URISyntaxException e) {
+        LOG.warn("Error trying to convert URL received from shared cache to"
+            + " a URI: " + url.toString());
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
   @VisibleForTesting
   void copyJar(Path originalJarPath, Path submitJarFile,
       short replication) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index cf59730..91541eb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -194,6 +194,77 @@ public interface MRJobConfig {
   public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
 
   /**
+   * This parameter controls the visibility of the localized job jar on the node
+   * manager. If set to true, the visibility will be set to
+   * LocalResourceVisibility.PUBLIC. If set to false, the visibility will be set
+   * to LocalResourceVisibility.APPLICATION. This is a generated parameter and
+   * should not be set manually via config files.
+   */
+  String JOBJAR_VISIBILITY = "mapreduce.job.jobjar.visibility";
+  boolean JOBJAR_VISIBILITY_DEFAULT = false;
+
+  /**
+   * This is a generated parameter and should not be set manually via config
+   * files.
+   */
+  String JOBJAR_SHARED_CACHE_UPLOAD_POLICY =
+      "mapreduce.job.jobjar.sharedcache.uploadpolicy";
+  boolean JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT = false;
+
+  /**
+   * This is a generated parameter and should not be set manually via config
+   * files.
+   */
+  String CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES =
+      "mapreduce.job.cache.files.sharedcache.uploadpolicies";
+
+  /**
+   * This is a generated parameter and should not be set manually via config
+   * files.
+   */
+  String CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES =
+      "mapreduce.job.cache.archives.sharedcache.uploadpolicies";
+
+  /**
+   * A comma delimited list of file resources that are needed for this MapReduce
+   * job. These resources, if the files resource type is enabled, should either
+   * use the shared cache or be added to the shared cache. This parameter can be
+   * modified programmatically using the MapReduce Job api.
+   */
+  String FILES_FOR_SHARED_CACHE = "mapreduce.job.cache.sharedcache.files";
+
+  /**
+   * A comma delimited list of libjar resources that are needed for this
+   * MapReduce job. These resources, if the libjars resource type is enabled,
+   * should either use the shared cache or be added to the shared cache. These
+   * resources will also be added to the classpath of all tasks for this
+   * MapReduce job. This parameter can be modified programmatically using the
+   * MapReduce Job api.
+   */
+  String FILES_FOR_CLASSPATH_AND_SHARED_CACHE =
+      "mapreduce.job.cache.sharedcache.files.addtoclasspath";
+
+  /**
+   * A comma delimited list of archive resources that are needed for this
+   * MapReduce job. These resources, if the archives resource type is enabled,
+   * should either use the shared cache or be added to the shared cache. This
+   * parameter can be modified programmatically using the MapReduce Job api.
+   */
+  String ARCHIVES_FOR_SHARED_CACHE =
+      "mapreduce.job.cache.sharedcache.archives";
+
+  /**
+   * A comma delimited list of resource categories that are enabled for the
+   * shared cache. If a category is enabled, resources in that category will be
+   * uploaded to the shared cache. The valid categories are: jobjar, libjars,
+   * files, archives. If "disabled" is specified then all categories are
+   * disabled. If "enabled" is specified then all categories are enabled.
+   */
+  String SHARED_CACHE_MODE = "mapreduce.job.sharedcache.mode";
+
+  String SHARED_CACHE_MODE_DEFAULT = "disabled";
+
+  /**
    * @deprecated Symlinks are always on and cannot be disabled.
    */
   @Deprecated

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java
new file mode 100644
index 0000000..de033e5
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * A class for parsing configuration parameters associated with the shared
+ * cache.
+ */
+@Private
+@Unstable
+public class SharedCacheConfig {
+  protected static final Log LOG = LogFactory.getLog(SharedCacheConfig.class);
+
+  private boolean sharedCacheFilesEnabled = false;
+  private boolean sharedCacheLibjarsEnabled = false;
+  private boolean sharedCacheArchivesEnabled = false;
+  private boolean sharedCacheJobjarEnabled = false;
+
+  public void init(Configuration conf) {
+    if (!MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(
+        MRConfig.FRAMEWORK_NAME))) {
+      // Shared cache is only valid if the job runs on yarn
+      return;
+    }
+
+    if(!conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
+        YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED)) {
+      return;
+    }
+
+
+    Collection<String> configs = StringUtils.getTrimmedStringCollection(
+        conf.get(MRJobConfig.SHARED_CACHE_MODE,
+            MRJobConfig.SHARED_CACHE_MODE_DEFAULT));
+    if (configs.contains("files")) {
+      this.sharedCacheFilesEnabled = true;
+    }
+    if (configs.contains("libjars")) {
+      this.sharedCacheLibjarsEnabled = true;
+    }
+    if (configs.contains("archives")) {
+      this.sharedCacheArchivesEnabled = true;
+    }
+    if (configs.contains("jobjar")) {
+      this.sharedCacheJobjarEnabled = true;
+    }
+    if (configs.contains("enabled")) {
+      this.sharedCacheFilesEnabled = true;
+      this.sharedCacheLibjarsEnabled = true;
+      this.sharedCacheArchivesEnabled = true;
+      this.sharedCacheJobjarEnabled = true;
+    }
+    if (configs.contains("disabled")) {
+      this.sharedCacheFilesEnabled = false;
+      this.sharedCacheLibjarsEnabled = false;
+      this.sharedCacheArchivesEnabled = false;
+      this.sharedCacheJobjarEnabled = false;
+    }
+  }
+
+  public boolean isSharedCacheFilesEnabled() {
+    return sharedCacheFilesEnabled;
+  }
+  public boolean isSharedCacheLibjarsEnabled() {
+    return sharedCacheLibjarsEnabled;
+  }
+  public boolean isSharedCacheArchivesEnabled() {
+    return sharedCacheArchivesEnabled;
+  }
+  public boolean isSharedCacheJobjarEnabled() {
+    return sharedCacheJobjarEnabled;
+  }
+  public boolean isSharedCacheEnabled() {
+    return (sharedCacheFilesEnabled || sharedCacheLibjarsEnabled ||
+        sharedCacheArchivesEnabled || sharedCacheJobjarEnabled);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 6b6faf2..9d166c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -649,6 +649,17 @@
 </property>
 
 <property>
+  <name>mapreduce.job.sharedcache.mode</name>
+  <value>disabled</value>
+  <description>
+    A comma delimited list of resource categories to submit to the shared cache.
+    The valid categories are: jobjar, libjars, files, archives.
+    If "disabled" is specified then the job submission code will not use
+    the shared cache.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>
   <description>The minimum size chunk that map input should be split

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24b03eb7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md
new file mode 100644
index 0000000..9e3987c
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md
@@ -0,0 +1,100 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+MR Support for YARN Shared Cache
+==================
+
+<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
+
+Overview
+-------
+
+MapReduce support for the YARN shared cache allows MapReduce jobs to take advantage
+of additional resource caching. This saves network bandwidth between the job
+submission client as well as within the YARN cluster itself. This will reduce job
+submission time and overall job runtime.
+
+
+Enabling/Disabling the shared cache
+-------
+
+First, your YARN cluster must have the shared cache service running. Please see YARN documentation
+for information on how to setup the shared cache service.
+
+A MapReduce user can specify what resources are eligible to be uploaded to the shared cache
+based on resource type. This is done using a configuration parameter in mapred-site.xml:
+
+```
+<property>
+    <name>mapreduce.job.sharedcache.mode</name>
+    <value>disabled</value>
+    <description>
+       A comma delimited list of resource categories to submit to the
+       shared cache. The valid categories are: jobjar, libjars, files,
+       archives. If "disabled" is specified then the job submission code
+       will not use the shared cache.
+    </description>
+</property>
+```
+
+If a resource type is listed, it will check the shared cache to see if the resource is already in the
+cache. If so, it will use the cached resource, if not, it will specify that the resource needs to be
+uploaded asynchronously.
+
+Specifying resources for the cache
+-------
+
+A MapReduce user has 3 ways to specify resources for a MapReduce job:
+
+1. **The command line via the generic options parser (i.e. -files, -archives, -libjars):** If a
+resource is specified via the command line and the resource type is enabled for the
+shared cache, that resource will use the shared cache.
+2. **The distributed cache api:** If a resource is specified via the distributed cache the
+resource will not use the shared cache regardless of if the resource type is enabled for
+the shared cache.
+3. **The shared cache api:** This is a new set of methods added to the
+org.apache.hadoop.mapreduce.Job api. It allows users to add a file to the shared cache,
+add it to the shared cache and the classpath and add an archive to the shared cache.
+These resources will be placed in the distributed cache and, if their resource type is
+enabled the client will use the shared cache as well.
+
+Resource naming
+-------
+
+It is important to ensure that each resource for a MapReduce job has a unique file name.
+This prevents symlink clobbering when YARN containers running MapReduce tasks are localized
+during container launch. A user can specify their own resource name by using the fragment
+portion of a URI. For example, for file resources specified on the command line, it could look
+like this:
+```
+-files /local/path/file1.txt#foo.txt,/local/path2/file1.txt#bar.txt
+```
+In the above example two files, named file1.txt, will be localized with two different names: foo.txt
+and bar.txt.
+
+Resource Visibility
+-------
+
+All resources in the shared cache have a PUBLIC visibility.
+
+
+MapReduce client behavior while the shared cache is unavailable
+-------
+
+In the event that the shared cache manager is unavailable, the MapReduce client uses a fail-fast
+mechanism. If the MapReduce client fails to contact the shared cache manager, the client will
+no longer use the shared cache for the rest of that job submission. This
+prevents the MapReduce client from timing out each time it tries to check for a resource
+in the shared cache. The MapReduce client quickly reverts to the default behavior and submits a
+Job as if the shared cache was never enabled in the first place.
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message