Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8893F11776 for ; Mon, 22 Sep 2014 23:48:06 +0000 (UTC) Received: (qmail 44591 invoked by uid 500); 22 Sep 2014 23:47:52 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 44384 invoked by uid 500); 22 Sep 2014 23:47:52 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 44056 invoked by uid 99); 22 Sep 2014 23:47:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2014 23:47:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BCA898A639D; Mon, 22 Sep 2014 23:47:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: curino@apache.org To: common-commits@hadoop.apache.org Date: Mon, 22 Sep 2014 23:48:20 -0000 Message-Id: <76cef49081904e1f9a60d3ce1c2b1f0b@git.apache.org> In-Reply-To: <34999a8d244d4fdca3c9e56694254d93@git.apache.org> References: <34999a8d244d4fdca3c9e56694254d93@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] git commit: MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs. Contributed by Gera Shegalov MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs. Contributed by Gera Shegalov Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7039b98e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7039b98e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7039b98e Branch: refs/heads/YARN-1051 Commit: 7039b98e1c459e9e0d8caa28cdaa2868e2bde2eb Parents: 9721e2c Author: Jason Lowe Authored: Mon Sep 22 15:20:59 2014 +0000 Committer: Jason Lowe Committed: Mon Sep 22 15:20:59 2014 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/YarnChild.java | 63 +------------------- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 1 + .../apache/hadoop/mapreduce/v2/util/MRApps.java | 61 +++++++++++++++++-- .../apache/hadoop/mapreduce/v2/TestUberAM.java | 7 --- 5 files changed, 62 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index eb4c251..815b3fb 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -360,6 +360,9 @@ Release 2.6.0 - UNRELEASED ApplicationNotFoundException if the job rolled off the RM view (Sangjin Lee via jlowe) + MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs (Gera Shegalov + via jlowe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 4ba1991..92bbc4a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -23,15 +23,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -43,7 +39,6 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.counters.Limits; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; @@ -307,7 +302,7 @@ class YarnChild { task.localizeConfiguration(job); // Set up the DistributedCache related configs - setupDistributedCacheConfig(job); + MRApps.setupDistributedCacheLocal(job); // Overwrite the localized task jobconf which is linked to in the current // work-dir. @@ -317,62 +312,6 @@ class YarnChild { task.setConf(job); } - /** - * Set up the DistributedCache related configs to make - * {@link DistributedCache#getLocalCacheFiles(Configuration)} - * and - * {@link DistributedCache#getLocalCacheArchives(Configuration)} - * working. - * @param job - * @throws IOException - */ - private static void setupDistributedCacheConfig(final JobConf job) - throws IOException { - - String localWorkDir = System.getenv("PWD"); - // ^ ^ all symlinks are created in the current work-dir - - // Update the configuration object with localized archives. - URI[] cacheArchives = DistributedCache.getCacheArchives(job); - if (cacheArchives != null) { - List localArchives = new ArrayList(); - for (int i = 0; i < cacheArchives.length; ++i) { - URI u = cacheArchives[i]; - Path p = new Path(u); - Path name = - new Path((null == u.getFragment()) ? p.getName() - : u.getFragment()); - String linkName = name.toUri().getPath(); - localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); - } - if (!localArchives.isEmpty()) { - job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils - .arrayToString(localArchives.toArray(new String[localArchives - .size()]))); - } - } - - // Update the configuration object with localized files. - URI[] cacheFiles = DistributedCache.getCacheFiles(job); - if (cacheFiles != null) { - List localFiles = new ArrayList(); - for (int i = 0; i < cacheFiles.length; ++i) { - URI u = cacheFiles[i]; - Path p = new Path(u); - Path name = - new Path((null == u.getFragment()) ? p.getName() - : u.getFragment()); - String linkName = name.toUri().getPath(); - localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); - } - if (!localFiles.isEmpty()) { - job.set(MRJobConfig.CACHE_LOCALFILES, - StringUtils.arrayToString(localFiles - .toArray(new String[localFiles.size()]))); - } - } - } - private static final FsPermission urw_gr = FsPermission.createImmutable((short) 0640); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 59e7249..1cf8b29 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -826,6 +826,7 @@ public class MRAppMaster extends CompositeService { @Override protected void serviceStart() throws Exception { if (job.isUber()) { + MRApps.setupDistributedCacheLocal(getConfig()); this.containerAllocator = new LocalContainerAllocator( this.clientService, this.context, nmHost, nmPort, nmHttpPort , containerID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 3bd8414..113b445 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -26,12 +26,11 @@ import java.net.URISyntaxException; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; @@ -58,8 +57,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.util.ApplicationClassLoader; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.ContainerLogAppender; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -469,6 +466,62 @@ public class MRApps extends Apps { DistributedCache.getFileVisibilities(conf)); } + /** + * Set up the DistributedCache related configs to make + * {@link DistributedCache#getLocalCacheFiles(Configuration)} + * and + * {@link DistributedCache#getLocalCacheArchives(Configuration)} + * working. + * @param conf + * @throws java.io.IOException + */ + public static void setupDistributedCacheLocal(Configuration conf) + throws IOException { + + String localWorkDir = System.getenv("PWD"); + // ^ ^ all symlinks are created in the current work-dir + + // Update the configuration object with localized archives. + URI[] cacheArchives = DistributedCache.getCacheArchives(conf); + if (cacheArchives != null) { + List localArchives = new ArrayList(); + for (int i = 0; i < cacheArchives.length; ++i) { + URI u = cacheArchives[i]; + Path p = new Path(u); + Path name = + new Path((null == u.getFragment()) ? p.getName() + : u.getFragment()); + String linkName = name.toUri().getPath(); + localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); + } + if (!localArchives.isEmpty()) { + conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils + .arrayToString(localArchives.toArray(new String[localArchives + .size()]))); + } + } + + // Update the configuration object with localized files. + URI[] cacheFiles = DistributedCache.getCacheFiles(conf); + if (cacheFiles != null) { + List localFiles = new ArrayList(); + for (int i = 0; i < cacheFiles.length; ++i) { + URI u = cacheFiles[i]; + Path p = new Path(u); + Path name = + new Path((null == u.getFragment()) ? p.getName() + : u.getFragment()); + String linkName = name.toUri().getPath(); + localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); + } + if (!localFiles.isEmpty()) { + conf.set(MRJobConfig.CACHE_LOCALFILES, + StringUtils.arrayToString(localFiles + .toArray(new String[localFiles.size()]))); + } + } + } + private static String getResourceDescription(LocalResourceType type) { if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) { return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java index 32199e5..e89a919 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java @@ -191,11 +191,4 @@ public class TestUberAM extends TestMRJobs { throws IOException, InterruptedException, ClassNotFoundException { super.testSleepJobWithSecurityOn(); } - - // Add a test for distcache when uber mode is enabled. TODO - @Override - @Test - public void testDistributedCache() throws Exception { - // - } }