hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctre...@apache.org
Subject [1/2] hadoop git commit: MAPREDUCE-5951. Add support for the YARN Shared Cache.
Date Thu, 12 Oct 2017 18:13:57 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 13fcfb3d4 -> e46d5bb96


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
index d0d7a34..d347da5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
@@ -220,7 +220,7 @@ public class TestJobResourceUploader {
           destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" };
 
   private String jobjarSubmitDir = "/jobjar-submit-dir";
-  private String expectedJobJar = jobjarSubmitDir + "/job.jar";
+  private String basicExpectedJobJar = jobjarSubmitDir + "/job.jar";
 
   @Test
   public void testPathsWithNoFragNoSchemeRelative() throws IOException {
@@ -236,7 +236,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -254,7 +254,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -272,7 +272,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -290,7 +290,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -308,7 +308,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -326,7 +326,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -344,7 +344,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf, true);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -362,7 +362,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf, true);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -402,44 +402,39 @@ public class TestJobResourceUploader {
   private void runTmpResourcePathTest(JobResourceUploader uploader,
       ResourceConf rConf, JobConf jConf, String[] expectedFiles,
       String[] expectedArchives, String expectedJobJar) throws IOException {
-    rConf.setupJobConf(jConf);
-    // We use a pre and post job object here because we need the post job object
-    // to get the new values set during uploadResources, but we need the pre job
-    // to set the job jar because JobResourceUploader#uploadJobJar uses the Job
-    // interface not the JobConf. The post job is automatically created in
-    // validateResourcePaths.
-    Job jobPre = Job.getInstance(jConf);
-    uploadResources(uploader, jConf, jobPre);
-
-    validateResourcePaths(jConf, expectedFiles, expectedArchives,
-        expectedJobJar, jobPre);
+    Job job = rConf.setupJobConf(jConf);
+    uploadResources(uploader, job);
+    validateResourcePaths(job, expectedFiles, expectedArchives, expectedJobJar);
   }
 
-  private void uploadResources(JobResourceUploader uploader, JobConf jConf,
-      Job job) throws IOException {
-    Collection<String> files = jConf.getStringCollection("tmpfiles");
-    Collection<String> libjars = jConf.getStringCollection("tmpjars");
-    Collection<String> archives = jConf.getStringCollection("tmparchives");
-    String jobJar = jConf.getJar();
-    uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null,
-        (short) 3);
-    uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"),
-        null, (short) 3);
-    uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"),
-        null, (short) 3);
-    uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3);
+  private void uploadResources(JobResourceUploader uploader, Job job)
+      throws IOException {
+    Configuration conf = job.getConfiguration();
+    Collection<String> files = conf.getStringCollection("tmpfiles");
+    Collection<String> libjars = conf.getStringCollection("tmpjars");
+    Collection<String> archives = conf.getStringCollection("tmparchives");
+    Map<URI, FileStatus> statCache = new HashMap<>();
+    Map<String, Boolean> fileSCUploadPolicies = new HashMap<>();
+    String jobJar = job.getJar();
+    uploader.uploadFiles(job, files, new Path("/files-submit-dir"), null,
+        (short) 3, fileSCUploadPolicies, statCache);
+    uploader.uploadArchives(job, archives, new Path("/archives-submit-dir"),
+        null, (short) 3, fileSCUploadPolicies, statCache);
+    uploader.uploadLibJars(job, libjars, new Path("/libjars-submit-dir"), null,
+        (short) 3, fileSCUploadPolicies, statCache);
+    uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3,
+        statCache);
   }
 
-  private void validateResourcePaths(JobConf jConf, String[] expectedFiles,
-      String[] expectedArchives, String expectedJobJar, Job preJob)
+  private void validateResourcePaths(Job job, String[] expectedFiles,
+      String[] expectedArchives, String expectedJobJar)
       throws IOException {
-    Job j = Job.getInstance(jConf);
-    validateResourcePathsSub(j.getCacheFiles(), expectedFiles);
-    validateResourcePathsSub(j.getCacheArchives(), expectedArchives);
+    validateResourcePathsSub(job.getCacheFiles(), expectedFiles);
+    validateResourcePathsSub(job.getCacheArchives(), expectedArchives);
     // We use a different job object here because the jobjar was set on a
     // different job object
     Assert.assertEquals("Job jar path is different than expected!",
-        expectedJobJar, preJob.getJar());
+        expectedJobJar, job.getJar());
   }
 
   private void validateResourcePathsSub(URI[] actualURIs,
@@ -645,7 +640,7 @@ public class TestJobResourceUploader {
       }
     }
 
-    private void setupJobConf(JobConf conf) {
+    private Job setupJobConf(JobConf conf) throws IOException {
       conf.set("tmpfiles",
           buildPathString("tmpFiles", this.numOfTmpFiles, ".txt"));
       conf.set("tmpjars",
@@ -675,6 +670,7 @@ public class TestJobResourceUploader {
       conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB);
       conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
           this.maxSingleResourceMB);
+      return new Job(conf);
     }
 
     // We always want absolute paths with a scheme in the DistributedCache, so

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java
new file mode 100644
index 0000000..7598141
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.client.api.SharedCacheClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests the JobResourceUploader class with the shared cache.
+ */
+public class TestJobResourceUploaderWithSharedCache {
+  protected static final Log LOG = LogFactory
+      .getLog(TestJobResourceUploaderWithSharedCache.class);
+  private static MiniDFSCluster dfs;
+  private static FileSystem localFs;
+  private static FileSystem remoteFs;
+  private static Configuration conf = new Configuration();
+  private static Path testRootDir;
+  private static Path remoteStagingDir =
+      new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR);
+  private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+
+  @Before
+  public void cleanup() throws Exception {
+    remoteFs.delete(remoteStagingDir, true);
+  }
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    // create configuration, dfs, file system
+    localFs = FileSystem.getLocal(conf);
+    testRootDir =
+        new Path("target",
+            TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir")
+            .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    remoteFs = dfs.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    try {
+      if (localFs != null) {
+        localFs.close();
+      }
+      if (remoteFs != null) {
+        remoteFs.close();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    } catch (IOException ioe) {
+      LOG.info("IO exception in closing file system");
+      ioe.printStackTrace();
+    }
+  }
+
+  private class MyFileUploader extends JobResourceUploader {
+    // The mocked SharedCacheClient that will be fed into the FileUploader
+    private SharedCacheClient mockscClient = mock(SharedCacheClient.class);
+    // A real client for checksum calculation
+    private SharedCacheClient scClient = SharedCacheClient
+        .createSharedCacheClient();
+
+    MyFileUploader(FileSystem submitFs, Configuration conf)
+        throws IOException {
+      super(submitFs, false);
+      // Initialize the real client, but don't start it. We don't need or want
+      // to create an actual proxy because we only use this for mocking out the
+      // getFileChecksum method.
+      scClient.init(conf);
+      when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer(
+          new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+              Path file = (Path) invocation.getArguments()[0];
+              // Use the real scClient to generate the checksum. We use an
+              // answer/mock combination to avoid having to spy on a real
+              // SharedCacheClient object.
+              return scClient.getFileChecksum(file);
+            }
+          });
+    }
+
+    // This method is to prime the mock client with the correct checksum, so it
+    // looks like a given resource is present in the shared cache.
+    public void mockFileInSharedCache(Path localFile, URL remoteFile)
+        throws YarnException, IOException {
+      // when the resource is referenced, simply return the remote path to the
+      // caller
+      when(mockscClient.use(any(ApplicationId.class),
+          eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile);
+    }
+
+    @Override
+    protected SharedCacheClient createSharedCacheClient(Configuration c) {
+      // Feed the mocked SharedCacheClient into the FileUploader logic
+      return mockscClient;
+    }
+  }
+
+  @Test
+  public void testSharedCacheDisabled() throws Exception {
+    JobConf jobConf = createJobConf();
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is disabled by default
+    uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false);
+
+  }
+
+  @Test
+  public void testSharedCacheEnabled() throws Exception {
+    JobConf jobConf = createJobConf();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is enabled for every file type
+    // the # of times SharedCacheClient.use is called should ==
+    // total # of files/libjars/archive/jobjar
+    uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false);
+  }
+
+  @Test
+  public void testSharedCacheEnabledWithJobJarInSharedCache()
+      throws Exception {
+    JobConf jobConf = createJobConf();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is enabled for every file type
+    // the # of times SharedCacheClient.use is called should ==
+    // total # of files/libjars/archive/jobjar
+    uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true);
+  }
+
+  @Test
+  public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception {
+    JobConf jobConf = createJobConf();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars");
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is enabled for archives and libjars type
+    // the # of times SharedCacheClient.use is called should ==
+    // total # of libjars and archives
+    uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true);
+  }
+
+  private JobConf createJobConf() {
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+
+    jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri()
+        .toString());
+    return jobConf;
+  }
+
+  private Path copyToRemote(Path jar) throws IOException {
+    Path remoteFile = new Path("/tmp", jar.getName());
+    remoteFs.copyFromLocalFile(jar, remoteFile);
+    return remoteFile;
+  }
+
+  private void makeJarAvailableInSharedCache(Path jar,
+      MyFileUploader fileUploader) throws YarnException, IOException {
+    // copy file to remote file system
+    Path remoteFile = copyToRemote(jar);
+    // prime mocking so that it looks like this file is in the shared cache
+    fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile));
+  }
+
+  private void uploadFilesToRemoteFS(Job job, JobConf jobConf,
+      int useCallCountExpected,
+      int numOfFilesShouldBeUploadedToSharedCacheExpected,
+      int numOfArchivesShouldBeUploadedToSharedCacheExpected,
+      boolean jobJarInSharedCacheBeforeUpload) throws Exception {
+    MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf);
+    SharedCacheConfig sharedCacheConfig = new SharedCacheConfig();
+    sharedCacheConfig.init(jobConf);
+
+    Path firstFile = createTempFile("first-input-file", "x");
+    Path secondFile = createTempFile("second-input-file", "xx");
+
+    // Add files to job conf via distributed cache API as well as command line
+    boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf);
+    assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded);
+    if (!fileAdded) {
+      Path remoteFile = copyToRemote(firstFile);
+      job.addCacheFile(remoteFile.toUri());
+    }
+    jobConf.set("tmpfiles", secondFile.toString());
+
+    // Create jars with a single file inside them.
+    Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1);
+    Path secondJar =
+        makeJar(new Path(testRootDir, "distributed.second.jar"), 2);
+
+    // Verify duplicated contents can be handled properly.
+    Path thirdJar = new Path(testRootDir, "distributed.third.jar");
+    localFs.copyFromLocalFile(secondJar, thirdJar);
+
+    // make secondJar cache available
+    makeJarAvailableInSharedCache(secondJar, fileUploader);
+
+    // Add libjars to job conf via distributed cache API as well as command
+    // line
+    boolean libjarAdded =
+        Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf);
+    assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded);
+    if (!libjarAdded) {
+      Path remoteJar = copyToRemote(firstJar);
+      job.addFileToClassPath(remoteJar);
+    }
+
+    jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString());
+
+    Path firstArchive = makeArchive("first-archive.zip", "first-file");
+    Path secondArchive = makeArchive("second-archive.zip", "second-file");
+
+    // Add archives to job conf via distributed cache API as well as command
+    // line
+    boolean archiveAdded =
+        Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf);
+    assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(),
+        archiveAdded);
+    if (!archiveAdded) {
+      Path remoteArchive = copyToRemote(firstArchive);
+      job.addCacheArchive(remoteArchive.toUri());
+    }
+
+    jobConf.set("tmparchives", secondArchive.toString());
+
+    // Add job jar to job conf
+    Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4);
+    if (jobJarInSharedCacheBeforeUpload) {
+      makeJarAvailableInSharedCache(jobJar, fileUploader);
+    }
+    jobConf.setJar(jobJar.toString());
+
+    fileUploader.uploadResources(job, remoteStagingDir);
+
+    verify(fileUploader.mockscClient, times(useCallCountExpected)).use(
+        any(ApplicationId.class), anyString());
+
+    int numOfFilesShouldBeUploadedToSharedCache = 0;
+    Map<String, Boolean> filesSharedCacheUploadPolicies =
+        Job.getFileSharedCacheUploadPolicies(jobConf);
+    for (Boolean policy : filesSharedCacheUploadPolicies.values()) {
+      if (policy) {
+        numOfFilesShouldBeUploadedToSharedCache++;
+      }
+    }
+    assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected,
+        numOfFilesShouldBeUploadedToSharedCache);
+
+    int numOfArchivesShouldBeUploadedToSharedCache = 0;
+    Map<String, Boolean> archivesSharedCacheUploadPolicies =
+        Job.getArchiveSharedCacheUploadPolicies(jobConf);
+    for (Boolean policy : archivesSharedCacheUploadPolicies.values()) {
+      if (policy) {
+        numOfArchivesShouldBeUploadedToSharedCache++;
+      }
+    }
+    assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected,
+        numOfArchivesShouldBeUploadedToSharedCache);
+  }
+
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(testRootDir, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    localFs.setPermission(path, new FsPermission("700"));
+    return path;
+  }
+
+  private Path makeJar(Path p, int index) throws FileNotFoundException,
+      IOException {
+    FileOutputStream fos =
+        new FileOutputStream(new File(p.toUri().getPath()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
+    jos.putNextEntry(ze);
+    jos.write(("inside the jar!" + index).getBytes());
+    jos.closeEntry();
+    jos.close();
+    localFs.setPermission(p, new FsPermission("700"));
+    return p;
+  }
+
+  private Path makeArchive(String archiveFile, String filename)
+      throws Exception {
+    Path archive = new Path(testRootDir, archiveFile);
+    Path file = new Path(testRootDir, filename);
+    DataOutputStream out = localFs.create(archive);
+    ZipOutputStream zos = new ZipOutputStream(out);
+    ZipEntry ze = new ZipEntry(file.toString());
+    zos.putNextEntry(ze);
+    zos.write(input.getBytes("UTF-8"));
+    zos.closeEntry();
+    zos.close();
+    return archive;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 1baa467..a23ff34 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -338,16 +340,41 @@ public class YARNRunner implements ClientProtocol {
     }
   }
 
-  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType
type)
-      throws IOException {
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      LocalResourceType type) throws IOException {
+    return createApplicationResource(fs, p, null, type,
+        LocalResourceVisibility.APPLICATION, false);
+  }
+
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      String fileSymlink, LocalResourceType type, LocalResourceVisibility viz,
+      Boolean uploadToSharedCache) throws IOException {
     LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
     FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(URL.fromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    // We need to be careful when converting from path to URL to add a fragment
+    // so that the symlink name when localized will be correct.
+    Path qualifiedPath =
+        fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath());
+    URI uriWithFragment = null;
+    boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
+    try {
+      if (useFragment) {
+        uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
+      } else {
+        uriWithFragment = qualifiedPath.toUri();
+      }
+    } catch (URISyntaxException e) {
+      throw new IOException(
+          "Error parsing local resource path."
+              + " Path was not able to be converted to a URI: " + qualifiedPath,
+          e);
+    }
+    rsrc.setResource(URL.fromURI(uriWithFragment));
     rsrc.setSize(rsrcStat.getLen());
     rsrc.setTimestamp(rsrcStat.getModificationTime());
     rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc.setVisibility(viz);
+    rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache);
     return rsrc;
   }
 
@@ -368,10 +395,21 @@ public class YARNRunner implements ClientProtocol {
             jobConfPath, LocalResourceType.FILE));
     if (jobConf.get(MRJobConfig.JAR) != null) {
       Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      // We hard code the job.jar symlink because mapreduce code expects the
+      // job.jar to be named that way.
+      FileContext fccc =
+          FileContext.getFileContext(jobJarPath.toUri(), jobConf);
+      LocalResourceVisibility jobJarViz =
+          jobConf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
+              MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.APPLICATION;
       LocalResource rc = createApplicationResource(
-          FileContext.getFileContext(jobJarPath.toUri(), jobConf),
-          jobJarPath,
-          LocalResourceType.PATTERN);
+          FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath,
+          MRJobConfig.JOB_JAR, LocalResourceType.PATTERN, jobJarViz,
+          jobConf.getBoolean(
+                  MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
+                  MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT));
       String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
       rc.setPattern(pattern);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
index 4a2b857..a3ea26e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
@@ -132,6 +132,58 @@ public class TestLocalJobSubmission {
     }
   }
 
+  /**
+   * Test local job submission with a file option.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobFilesOption() throws IOException {
+    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    final String[] args =
+        {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
+            "-mt", "1", "-rt", "1"};
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
+
+  /**
+   * Test local job submission with an archive option.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobArchivesOption() throws IOException {
+    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    final String[] args =
+        {"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
+            "1", "-mt", "1", "-rt", "1"};
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
+
   private Path makeJar(Path p) throws IOException {
     FileOutputStream fos = new FileOutputStream(new File(p.toString()));
     JarOutputStream jos = new JarOutputStream(fos);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
index 22cb530..6e280ad 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
@@ -1298,6 +1298,65 @@ public class TestMRJobs {
     jarFile.delete();
   }
 
+  @Test
+  public void testSharedCache() throws Exception {
+    Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    Job job = Job.getInstance(mrCluster.getConfig());
+
+    Configuration jobConf = job.getConfiguration();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
+
+    Path inputFile = createTempFile("input-file", "x");
+
+    // Create jars with a single file inside them.
+    Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
+    Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
+    Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
+
+    // Add libjars to job conf
+    jobConf.set("tmpjars", second.toString() + "," + third.toString() + ","
+        + fourth.toString());
+
+    // Because the job jar is a "dummy" jar, we need to include the jar with
+    // DistributedCacheChecker or it won't be able to find it
+    Path distributedCacheCheckerJar =
+        new Path(JarFinder.getJar(SharedCacheChecker.class));
+    job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
+        localFs.getUri(), distributedCacheCheckerJar.getParent()));
+
+    job.setMapperClass(SharedCacheChecker.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    FileInputFormat.setInputPaths(job, inputFile);
+
+    job.setMaxMapAttempts(1); // speed up failures
+
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    Assert.assertTrue(job.waitForCompletion(true));
+    Assert.assertTrue("Tracking URL was " + trackingUrl
+        + " but didn't Match Job ID " + jobId,
+        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+  }
+
+  /**
+   * An identity mapper for testing the shared cache.
+   */
+  public static class SharedCacheChecker extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+    @Override
+    public void setup(Context context) throws IOException {
+    }
+  }
+
   public static class ConfVerificationMapper extends SleepMapper {
     @Override
     protected void setup(Context context)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-project/src/site/site.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index 44551fe..4512947 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -112,6 +112,7 @@
       <item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
       <item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
       <item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/>
+      <item name="Support for YARN Shared Cache" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html"/>
     </menu>
 
     <menu name="MapReduce REST APIs" inherit="top">


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message