hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r807543 [2/2] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Date Tue, 25 Aug 2009 10:27:54 GMT
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=807543&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
Tue Aug 25 10:27:53 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+
+/**
+ * Tests the use of the
+ * {@link org.apache.hadoop.mapreduce.filecache.DistributedCache} within the
+ * full MR flow as well as the LocalJobRunner. This ought to be part of the
+ * filecache package, but that package is not currently in mapred, so cannot
+ * depend on MR for testing.
+ * 
+ * We use the distributed.* namespace for temporary files.
+ * 
+ * See {@link TestMiniMRLocalFS}, {@link TestMiniMRDFSCaching}, and
+ * {@link MRCaching} for other tests that test the distributed cache.
+ * 
+ * This test is not fast: it uses MiniMRCluster.
+ */
+public class TestMRWithDistributedCache extends TestCase {
+  private static Path TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"));
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static final Log LOG =
+    LogFactory.getLog(TestMRWithDistributedCache.class);
+
+  public static class DistributedCacheChecker extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      Path[] files = DistributedCache.getLocalCacheFiles(conf);
+      Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+      FileSystem fs = LocalFileSystem.get(conf);
+
+      // Check that 2 files and 2 archives are present
+      TestCase.assertEquals(2, files.length);
+      TestCase.assertEquals(2, archives.length);
+
+      // Check lengths of the files
+      TestCase.assertEquals(1, fs.getFileStatus(files[0]).getLen());
+      TestCase.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
+
+      // Check extraction of the archive
+      TestCase.assertTrue(fs.exists(new Path(archives[0],
+          "distributed.jar.inside3")));
+      TestCase.assertTrue(fs.exists(new Path(archives[1],
+          "distributed.jar.inside4")));
+
+      // Check the class loaders
+      LOG.info("Java Classpath: " + System.getProperty("java.class.path"));
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      // Both the file and the archive were added to classpath, so both
+      // should be reachable via the class loader.
+      TestCase.assertNotNull(cl.getResource("distributed.jar.inside2"));
+      TestCase.assertNotNull(cl.getResource("distributed.jar.inside3"));
+      TestCase.assertNull(cl.getResource("distributed.jar.inside4"));
+
+
+      // Check that the symlink for the renaming was created in the cwd;
+      // This only happens for real for non-local jobtrackers.
+      // (The symlinks exist in "localRunner/" for local Jobtrackers,
+      // but the user has no way to get at them.
+      if (!"local".equals(
+          context.getConfiguration().get("mapred.job.tracker"))) {
+        File symlinkFile = new File("distributed.first.symlink");
+        TestCase.assertTrue(symlinkFile.exists());
+        TestCase.assertEquals(1, symlinkFile.length());
+      }
+    }
+  }
+
+  private void testWithConf(JobConf conf) throws IOException,
+      InterruptedException, ClassNotFoundException, URISyntaxException {
+    // Create a temporary file of length 1.
+    Path first = createTempFile("distributed.first", "x");
+    // Create two jars with a single file inside them.
+    Path second =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
+    Path third =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
+    Path fourth =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
+
+    // Creates the Job Configuration
+    DistributedCache.addCacheFile(
+        new URI(first.toUri().toString() + "#distributed.first.symlink"),
+        conf);
+    DistributedCache.addFileToClassPath(second, conf);
+    DistributedCache.addArchiveToClassPath(third, conf);
+    DistributedCache.addCacheArchive(fourth.toUri(), conf);
+    DistributedCache.createSymlink(conf);
+
+    conf.setMaxMapAttempts(1); // speed up failures
+    Job job = new Job(conf);
+    job.setMapperClass(DistributedCacheChecker.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    FileInputFormat.setInputPaths(job, first);
+
+    job.submit();
+    assertTrue(job.waitForCompletion(false));
+  }
+
+  /** Tests using the local job runner. */
+  public void testLocalJobRunner() throws Exception {
+    JobConf c = new JobConf();
+    c.set("mapred.job.tracker", "local");
+    c.set("fs.default.name", "file:///");
+    testWithConf(c);
+  }
+
+  /** Tests using a full MiniMRCluster. */
+  public void testMiniMRJobRunner() throws Exception {
+    MiniMRCluster m = new MiniMRCluster(1, "file:///", 1);
+    try {
+      testWithConf(m.createJobConf());
+    } finally {
+      m.shutdown();
+    }
+
+  }
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(TEST_ROOT_DIR, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    return path;
+  }
+
+  private Path makeJar(Path p, int index) throws FileNotFoundException,
+      IOException {
+    FileOutputStream fos = new FileOutputStream(new File(p.toString()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
+    jos.putNextEntry(ze);
+    jos.write(("inside the jar!" + index).getBytes());
+    jos.closeEntry();
+    jos.close();
+    return p;
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=807543&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Tue Aug 25 10:27:53 2009
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+
+public class TestTrackerDistributedCacheManager extends TestCase {
+  private static final String TEST_LOCAL_DIR_PROP = "test.local.dir";
+  private static String TEST_CACHE_BASE_DIR =
+    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
+    .toString().replace(' ', '+');
+  private static String TEST_ROOT_DIR =
+    System.getProperty("test.build.data", "/tmp/distributedcache");
+  private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
+  private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private Configuration conf;
+  private Path firstCacheFile;
+  private Path secondCacheFile;
+
+  @Override
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf.set(TEST_LOCAL_DIR_PROP, TEST_ROOT_DIR);
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
+    secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+    createTempFile(firstCacheFile);
+    createTempFile(secondCacheFile);
+  }
+
+  /**
+   * This is the typical flow for using the DistributedCache classes.
+   */
+  public void testManagerFlow() throws IOException {
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf);
+    LocalDirAllocator localDirAllocator = 
+        new LocalDirAllocator(TEST_LOCAL_DIR_PROP);
+
+    // Configures a task/job with both a regular file and a "classpath" file.
+    Configuration subConf = new Configuration(conf);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+    DistributedCache.addFileToClassPath(secondCacheFile, subConf);
+    TrackerDistributedCacheManager.determineTimestamps(subConf);
+
+    Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
+    FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
+    subConf.writeXml(os);
+    os.close();
+
+    TaskDistributedCacheManager handle =
+      manager.newTaskDistributedCacheManager(subConf);
+    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+    handle.setup(localDirAllocator, 
+        new File(new Path(TEST_ROOT_DIR, "workdir").toString()), "distcache");
+    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
+    assertNotNull(null, localCacheFiles);
+    assertEquals(2, localCacheFiles.length);
+    Path cachedFirstFile = localCacheFiles[0];
+    Path cachedSecondFile = localCacheFiles[1];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
+    assertFalse("Paths should be different.", 
+        firstCacheFile.equals(cachedFirstFile));
+
+    assertEquals(1, handle.getClassPaths().size());
+    assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
+
+    // Cleanup
+    handle.release();
+    manager.purgeCache();
+    assertFalse(pathToFile(cachedFirstFile).exists());
+  }
+
+
+  /** test delete cache */
+  public void testDeleteCache() throws Exception {
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf);
+    FileSystem localfs = FileSystem.getLocal(conf);
+
+    manager.getLocalCache(firstCacheFile.toUri(), conf, 
+        new Path(TEST_CACHE_BASE_DIR), null, false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
+    manager.releaseCache(firstCacheFile.toUri(), conf);
+    //in above code,localized a file of size 4K and then release the cache 
+    // which will cause the cache be deleted when the limit goes out. 
+    // The below code localize another cache which's designed to
+    //sweep away the first cache.
+    manager.getLocalCache(secondCacheFile.toUri(), conf, 
+        new Path(TEST_CACHE_BASE_DIR), null, false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
+    FileStatus[] dirStatuses = localfs.listStatus(
+        new Path(TEST_CACHE_BASE_DIR));
+    assertTrue("DistributedCache failed deleting old" + 
+        " cache when the cache store is full.",
+        dirStatuses.length > 1);
+  }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    TrackerDistributedCacheManager manager =
+      new TrackerDistributedCacheManager(conf);
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///"
+        + firstCacheFile.toUri().getPath());
+    Path result = manager.getLocalCache(fileToCache.toUri(), conf,
+        new Path(TEST_CACHE_BASE_DIR), null, false, System.currentTimeMillis(),
+        new Path(TEST_ROOT_DIR), false);
+    assertNotNull("DistributedCache cached file on non-default filesystem.",
+        result);
+  }
+
+  static void createTempFile(Path p) throws IOException {
+    File f = new File(p.toString());
+    FileOutputStream os = new FileOutputStream(f);
+    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    new Random().nextBytes(toWrite);
+    os.write(toWrite);
+    os.close();
+    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+  }
+
+  @Override
+  protected void tearDown() throws IOException {
+    new File(firstCacheFile.toString()).delete();
+    new File(secondCacheFile.toString()).delete();
+  }
+
+  private void assertFileLengthEquals(Path a, Path b) 
+      throws FileNotFoundException {
+    assertEquals("File sizes mismatch.", 
+       pathToFile(a).length(), pathToFile(b).length());
+  }
+
+  private File pathToFile(Path p) {
+    return new File(p.toString());
+  }
+}



Mime
View raw message