Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1276D92CD for ; Mon, 11 Jun 2012 20:37:26 +0000 (UTC) Received: (qmail 93838 invoked by uid 500); 11 Jun 2012 20:37:25 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 93724 invoked by uid 500); 11 Jun 2012 20:37:25 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 93712 invoked by uid 99); 11 Jun 2012 20:37:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jun 2012 20:37:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jun 2012 20:37:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id ECC342388860; Mon, 11 Jun 2012 20:37:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1348997 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/h... Date: Mon, 11 Jun 2012 20:37:03 -0000 To: mapreduce-commits@hadoop.apache.org From: tomwhite@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120611203703.ECC342388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tomwhite Date: Mon Jun 11 20:37:03 2012 New Revision: 1348997 URL: http://svn.apache.org/viewvc?rev=1348997&view=rev Log: MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache. Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1348997&r1=1348996&r2=1348997&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jun 11 20:37:03 2012 @@ -129,6 +129,9 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-4146. Support limits on task status string length and number of block locations in branch-2. (Ahmed Radwan via tomwhite) + MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache. + (tomwhite) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1348997&r1=1348996&r2=1348997&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Mon Jun 11 20:37:03 2012 @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; @@ -45,6 +46,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRConfig; @@ -72,6 +74,8 @@ class LocalDistributedCacheManager { private List localFiles = new ArrayList(); private List localClasspaths = new ArrayList(); + private List symlinksCreated = new ArrayList(); + private boolean setupCalled = false; /** @@ -172,18 +176,51 @@ class LocalDistributedCacheManager { .size()]))); } if (DistributedCache.getSymlink(conf)) { - // This is not supported largely because, - // for a Child subprocess, the cwd in LocalJobRunner - // is not a fresh slate, but rather the user's working directory. - // This is further complicated because the logic in - // setupWorkDir only creates symlinks if there's a jarfile - // in the configuration. - LOG.warn("LocalJobRunner does not support " + - "symlinking into current working dir."); + File workDir = new File(System.getProperty("user.dir")); + URI[] archives = DistributedCache.getCacheArchives(conf); + URI[] files = DistributedCache.getCacheFiles(conf); + Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); + Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); + if (archives != null) { + for (int i = 0; i < archives.length; i++) { + String link = archives[i].getFragment(); + String target = new File(localArchives[i].toUri()).getPath(); + symlink(workDir, target, link); + } + } + if (files != null) { + for (int i = 0; i < files.length; i++) { + String link = files[i].getFragment(); + String target = new File(localFiles[i].toUri()).getPath(); + symlink(workDir, target, link); + } + } } setupCalled = true; } + /** + * Utility method for creating a symlink and warning on errors. + * + * If link is null, does nothing. + */ + private void symlink(File workDir, String target, String link) + throws IOException { + if (link != null) { + link = workDir.toString() + Path.SEPARATOR + link; + File flink = new File(link); + if (!flink.exists()) { + LOG.info(String.format("Creating symlink: %s <- %s", target, link)); + if (0 != FileUtil.symLink(target, link)) { + LOG.warn(String.format("Failed to create symlink: %s <- %s", target, + link)); + } else { + symlinksCreated.add(new File(link)); + } + } + } + } + /** * Are the resources that should be added to the classpath? * Should be called after setup(). @@ -217,6 +254,12 @@ class LocalDistributedCacheManager { } public void close() throws IOException { + for (File symlink : symlinksCreated) { + if (!symlink.delete()) { + LOG.warn("Failed to delete symlink created by the local job runner: " + + symlink); + } + } FileContext localFSFileContext = FileContext.getLocalFSFileContext(); for (String archive : localArchives) { localFSFileContext.delete(new Path(archive), true); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1348997&r1=1348996&r2=1348997&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Mon Jun 11 20:37:03 2012 @@ -23,6 +23,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -61,6 +62,9 @@ import org.apache.hadoop.mapreduce.serve public class TestMRWithDistributedCache extends TestCase { private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")); + private static File symlinkFile = new File("distributed.first.symlink"); + private static File expectedAbsentSymlinkFile = + new File("distributed.second.jar"); private static Configuration conf = new Configuration(); private static FileSystem localFs; static { @@ -107,20 +111,17 @@ public class TestMRWithDistributedCache TestCase.assertNotNull(cl.getResource("distributed.jar.inside3")); TestCase.assertNull(cl.getResource("distributed.jar.inside4")); - // Check that the symlink for the renaming was created in the cwd; - // This only happens for real for non-local jobtrackers. - // (The symlinks exist in "localRunner/" for local Jobtrackers, - // but the user has no way to get at them. - if (!"local".equals( - context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) { - File symlinkFile = new File("distributed.first.symlink"); - TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", symlinkFile.exists()); - TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length()); - } + TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", + symlinkFile.exists()); + TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, + symlinkFile.length()); + + TestCase.assertFalse("second file should not be symlinked", + expectedAbsentSymlinkFile.exists()); } } - + private void testWithConf(Configuration conf) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { // Create a temporary file of length 1. @@ -144,11 +145,7 @@ public class TestMRWithDistributedCache job.addFileToClassPath(second); job.addArchiveToClassPath(third); job.addCacheArchive(fourth.toUri()); - - // don't create symlink for LocalJobRunner - if (!"local".equals(conf.get(JTConfig.JT_IPC_ADDRESS))) { - job.createSymlink(); - } + job.createSymlink(); job.setMaxMapAttempts(1); // speed up failures job.submit(); @@ -157,10 +154,17 @@ public class TestMRWithDistributedCache /** Tests using the local job runner. */ public void testLocalJobRunner() throws Exception { + symlinkFile.delete(); // ensure symlink is not present (e.g. if test is + // killed part way through) + Configuration c = new Configuration(); c.set(JTConfig.JT_IPC_ADDRESS, "local"); c.set("fs.defaultFS", "file:///"); testWithConf(c); + + assertFalse("Symlink not removed by local job runner", + // Symlink target will have gone so can't use File.exists() + Arrays.asList(new File(".").list()).contains(symlinkFile.getName())); } private Path createTempFile(String filename, String contents)