mrunit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbe...@apache.org
Subject svn commit: r1371380 - in /mrunit/trunk/src: main/java/org/apache/hadoop/mrunit/ main/java/org/apache/hadoop/mrunit/internal/util/ main/java/org/apache/hadoop/mrunit/mapreduce/ test/java/org/apache/hadoop/mrunit/ test/java/org/apache/hadoop/mrunit/mapr...
Date Thu, 09 Aug 2012 18:53:22 GMT
Author: dbeech
Date: Thu Aug  9 18:53:21 2012
New Revision: 1371380

URL: http://svn.apache.org/viewvc?rev=1371380&view=rev
Log:
MRUNIT-98: DistributedCache support

Added:
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/util/DistCacheUtils.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestDistributedCache.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
    mrunit/trunk/src/test/resources/
    mrunit/trunk/src/test/resources/testarchive.tar   (with props)
    mrunit/trunk/src/test/resources/testarchive.tar.abc   (with props)
    mrunit/trunk/src/test/resources/testfile
Modified:
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java Thu Aug  9 18:53:21
2012
@@ -116,23 +116,28 @@ public class MapDriver<K1, V1, K2, V2> e
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-    preRunChecks(myMapper);
-
-    final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-        .createOutputCollectable(getConfiguration(),
-            getOutputCopyingOrInputFormatConfiguration());
-    final MockReporter reporter = new MockReporter(
-        MockReporter.ReporterType.Mapper, getCounters(),
-        getMapInputPath());
-
-    ReflectionUtils.setConf(myMapper, new JobConf(getConfiguration()));
-
-    for (Pair<K1, V1> kv : inputs) {
-      myMapper.map(kv.getFirst(), kv.getSecond(), outputCollectable, reporter);
+    try {
+      preRunChecks(myMapper);
+      initDistributedCache();
+      final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+          .createOutputCollectable(getConfiguration(),
+              getOutputCopyingOrInputFormatConfiguration());
+      final MockReporter reporter = new MockReporter(
+          MockReporter.ReporterType.Mapper, getCounters(),
+          getMapInputPath());
+
+      ReflectionUtils.setConf(myMapper, new JobConf(getConfiguration()));
+
+      for (Pair<K1, V1> kv : inputs) {
+        myMapper.map(kv.getFirst(), kv.getSecond(), outputCollectable, reporter);
+      }
+
+      myMapper.close();
+
+      return outputCollectable.getOutputs();
+    } finally {
+      cleanupDistributedCache();
     }
-
-    myMapper.close();
-    return outputCollectable.getOutputs();
   }
 
   @Override

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java Thu Aug  9 18:53:21
2012
@@ -239,28 +239,33 @@ public class MapReduceDriver<K1, V1, K2,
 
   @Override
   public List<Pair<K3, V3>> run() throws IOException {
-    preRunChecks(myMapper, myReducer);
-
-    List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
-
-    // run map component
-    LOG.debug("Starting map phase with mapper: " + myMapper);
-    mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
-        .withCounters(getCounters()).withConfiguration(configuration)
-        .withAll(inputList).withMapInputPath(getMapInputPath()).run());
-
-    if (myCombiner != null) {
-      // User has specified a combiner. Run this and replace the mapper outputs
-      // with the result of the combiner.
-      LOG.debug("Starting combine phase with combiner: " + myCombiner);
-      mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
-          shuffle(mapOutputs), myCombiner);
+    try {
+      preRunChecks(myMapper, myReducer);
+      initDistributedCache();
+      List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
+
+      // run map component
+      LOG.debug("Starting map phase with mapper: " + myMapper);
+      mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+          .withCounters(getCounters()).withConfiguration(configuration)
+          .withAll(inputList).withMapInputPath(getMapInputPath()).run());
+
+      if (myCombiner != null) {
+        // User has specified a combiner. Run this and replace the mapper outputs
+        // with the result of the combiner.
+        LOG.debug("Starting combine phase with combiner: " + myCombiner);
+        mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
+            shuffle(mapOutputs), myCombiner);
+      }
+
+      // Run the reduce phase.
+      LOG.debug("Starting reduce phase with reducer: " + myReducer);
+      
+      return new ReducePhaseRunner<K3, V3>()
+          .runReduce(shuffle(mapOutputs),myReducer);
+    } finally {
+      cleanupDistributedCache();
     }
-
-    // Run the reduce phase.
-    LOG.debug("Starting reduce phase with reducer: " + myReducer);
-    return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs),
-        myReducer);
   }
 
   @Override

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java Thu Aug  9 18:53:21
2012
@@ -119,24 +119,27 @@ public class ReduceDriver<K1, V1, K2, V2
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-    preRunChecks(myReducer);
-
-    final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-        .createOutputCollectable(getConfiguration(),
-            getOutputCopyingOrInputFormatConfiguration());
-    final MockReporter reporter = new MockReporter(
-        MockReporter.ReporterType.Reducer, getCounters(),
-        getMapInputPath());
-
-    ReflectionUtils.setConf(myReducer, new JobConf(getConfiguration()));
-
-    for (Pair<K1, List<V1>> kv : inputs) {
-      myReducer.reduce(kv.getFirst(), kv.getSecond().iterator(),
-          outputCollectable, reporter);
+    try {
+      preRunChecks(myReducer);
+      initDistributedCache();
+      final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+          .createOutputCollectable(getConfiguration(),
+              getOutputCopyingOrInputFormatConfiguration());
+      final MockReporter reporter = new MockReporter(
+          MockReporter.ReporterType.Reducer, getCounters(),
+          getMapInputPath());
+
+      ReflectionUtils.setConf(myReducer, new JobConf(getConfiguration()));
+
+      for (Pair<K1, List<V1>> kv : inputs) {
+        myReducer.reduce(kv.getFirst(), kv.getSecond().iterator(),
+            outputCollectable, reporter);
+      }
+      myReducer.close();
+      return outputCollectable.getOutputs();
+    } finally {
+      cleanupDistributedCache();  
     }
-    
-    myReducer.close();
-    return outputCollectable.getOutputs();
   }
 
   @Override

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java Thu Aug  9 18:53:21
2012
@@ -20,7 +20,9 @@ package org.apache.hadoop.mrunit;
 import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -30,10 +32,13 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
 import org.apache.hadoop.mrunit.internal.io.Serialization;
+import org.apache.hadoop.mrunit.internal.util.DistCacheUtils;
 import org.apache.hadoop.mrunit.types.Pair;
 
 public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2, V2,
T>> {
@@ -45,10 +50,9 @@ public abstract class TestDriver<K1, V1,
   private boolean strictCountersChecking = false;
   protected List<Pair<Enum<?>, Long>> expectedEnumCounters;
   protected List<Pair<Pair<String, String>, Long>> expectedStringCounters;
-
   protected Configuration configuration;
   private Configuration outputCopyingOrInputFormatConf;
-
+  private File tmpDistCacheDir;
   protected CounterWrapper counterWrapper;
 
   protected Serialization serialization;
@@ -213,6 +217,106 @@ public abstract class TestDriver<K1, V1,
   }
 
   /**
+   * Adds a file to be put on the distributed cache. 
+   * The path may be relative and will try to be resolved from
+   * the classpath of the test. 
+   *  
+   * @param path path to the file
+   */
+  public void addCacheFile(String path) {
+    addCacheFile(DistCacheUtils.findResource(path));
+  }
+
+  /**
+   * Adds a file to be put on the distributed cache.
+   * @param uri uri of the file
+   */
+  public void addCacheFile(URI uri) {
+    DistributedCache.addCacheFile(uri, getConfiguration());
+  }
+
+  /**
+   * Set the list of files to put on the distributed cache
+   * @param files list of URIs
+   */
+  public void setCacheFiles(URI[] files) {
+    DistributedCache.setCacheFiles(files, getConfiguration());
+  }
+
+  /**
+   * Adds an archive to be put on the distributed cache. 
+   * The path may be relative and will try to be resolved from
+   * the classpath of the test. 
+   *  
+   * @param path path to the archive
+   */
+  public void addCacheArchive(String path) {
+    addCacheArchive(DistCacheUtils.findResource(path));
+  }
+
+  /**
+   * Adds an archive to be put on the distributed cache.
+   * @param uri uri of the archive
+   */
+  public void addCacheArchive(URI uri) {
+    DistributedCache.addCacheArchive(uri, getConfiguration());
+  }
+
+  /**
+   * Set the list of archives to put on the distributed cache
+   * @param archives list of URIs
+   */
+  public void setCacheArchives(URI[] archives) {
+    DistributedCache.setCacheArchives(archives, getConfiguration());
+  }
+
+  /**
+   * Adds a file to be put on the distributed cache. 
+   * The path may be relative and will try to be resolved from
+   * the classpath of the test. 
+   *  
+   * @param file path to the file
+   * @return the driver
+   */
+  public T withCacheFile(String file) {
+    addCacheFile(file);
+    return thisAsTestDriver();
+  }
+
+  /**
+   * Adds a file to be put on the distributed cache.
+   * @param file uri of the file
+   * @return the driver
+   */
+  public T withCacheFile(URI file) {
+    addCacheFile(file);
+    return thisAsTestDriver();
+  }
+
+  /**
+   * Adds an archive to be put on the distributed cache. 
+   * The path may be relative and will try to be resolved from
+   * the classpath of the test. 
+   *  
+   * @param archive path to the archive
+   * @return the driver
+   */
+  public T withCacheArchive(String archive) {
+    addCacheArchive(archive);
+    return thisAsTestDriver();
+  }
+
+  /**
+   * Adds an archive to be put on the distributed cache.
+   * @param file uri of the archive
+   * @return the driver
+   */
+  public T withCacheArchive(URI archive) {
+    addCacheArchive(archive);
+    return thisAsTestDriver();
+  }
+
+  /**
    * Runs the test but returns the result set instead of validating it (ignores
    * any addOutput(), etc calls made before this). 
    * 
@@ -230,6 +334,86 @@ public abstract class TestDriver<K1, V1,
   }
 
   /**
+   * Initialises the test distributed cache if required. This
+   * process is referred to as "localizing" by Hadoop, but since
+   * this is a unit test all files/archives are already local. 
+   * 
+   * Cached files are not moved but cached archives are extracted 
+   * into a temporary directory. 
+   * 
+   * @throws IOException
+   */
+  protected void initDistributedCache() throws IOException {
+
+    Configuration conf = getConfiguration();
+
+    if (isDistributedCacheInitialised(conf)) {
+      return;
+    }
+
+    List<Path> localArchives = new ArrayList<Path>();
+    List<Path> localFiles = new ArrayList<Path>();
+
+    if (DistributedCache.getCacheFiles(conf) != null) {
+      for (URI uri: DistributedCache.getCacheFiles(conf)) {
+        Path filePath = new Path(uri.getPath());
+        localFiles.add(filePath);
+      }
+      if (!localFiles.isEmpty()) {
+        DistributedCache.addLocalFiles(conf, 
+            DistCacheUtils.stringifyPathList(localFiles));
+      }
+    }
+    if (DistributedCache.getCacheArchives(conf) != null) {
+      for (URI uri: DistributedCache.getCacheArchives(conf)) {
+        Path archivePath = new Path(uri.getPath());
+        if (tmpDistCacheDir == null) {
+          tmpDistCacheDir = DistCacheUtils.createTempDirectory();
+        }
+        localArchives.add(DistCacheUtils.extractArchiveToTemp(
+            archivePath, tmpDistCacheDir));
+      }
+      if (!localArchives.isEmpty()) {
+        DistributedCache.addLocalArchives(conf, 
+            DistCacheUtils.stringifyPathList(localArchives));
+      }
+    }
+  }
+
+  /**
+   * Checks whether the distributed cache has been "localized", i.e.
+   * archives extracted and paths moved so that they can be accessed
+   * through {@link DistributedCache#getLocalCacheArchives()} and 
+   * {@link DistributedCache#getLocalCacheFiles()}
+   * 
+   * @param conf the configuration
+   * @return true if the cache is initialised
+   * @throws IOException
+   */
+  private boolean isDistributedCacheInitialised(Configuration conf) 
+      throws IOException {
+    return DistributedCache.getLocalCacheArchives(conf) != null ||
+        DistributedCache.getLocalCacheFiles(conf) != null;
+  }
+
+  /**
+   * Cleans up the distributed cache test by deleting the 
+   * temporary directory and any extracted cache archives
+   * contained within
+   * 
+   * @throws IOException 
+   *  if the local fs handle cannot be retrieved 
+   */
+  protected void cleanupDistributedCache() throws IOException {
+    if (tmpDistCacheDir != null) {
+      FileSystem fs = FileSystem.getLocal(getConfiguration());
+      LOG.debug("Deleting " + tmpDistCacheDir.toURI());
+      fs.delete(new Path(tmpDistCacheDir.toURI()), true);
+    }
+    tmpDistCacheDir = null;
+  }
+
+  /**
    * Runs the test but returns the result set instead of validating it (ignores
    * any addOutput(), etc calls made before this)
    * 

Added: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/util/DistCacheUtils.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/util/DistCacheUtils.java?rev=1371380&view=auto
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/util/DistCacheUtils.java
(added)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/util/DistCacheUtils.java
Thu Aug  9 18:53:21 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.RunJar;
+
+public class DistCacheUtils {
+
+  private static final Log LOG = LogFactory.getLog(DistCacheUtils.class);
+
+  private DistCacheUtils() {
+    //
+  }
+
+  /**
+   * Attempt to create a URI from a string path. First tries to load as a 
+   * class resource, and failing that, loads as a File.
+   *  
+   * @param path path to resource
+   * @return the uri of the resource
+   */
+  public static URI findResource(String path) {
+    URI uri = null;
+    try {
+      URL resourceUrl = DistCacheUtils.class.getClassLoader().getResource(path);
+      if (resourceUrl != null) {
+        uri = resourceUrl.toURI();
+      } else {
+        uri = new File(path).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(path + " could not be found", e);
+    }
+    if (uri == null) {
+      throw new IllegalArgumentException(path + " could not be found");
+    }
+    return uri;
+  }
+
+  /**
+   * Creates a comma separated list from a list of Path objects. 
+   * Method borrowed from Hadoop's TaskDistributedCacheManager
+   */
+  public static String stringifyPathList(List<Path> p){
+    if (p == null || p.isEmpty()) {
+      return null;
+    }
+    StringBuilder str = new StringBuilder(p.get(0).toString());
+    for (int i = 1; i < p.size(); i++){
+      str.append(",");
+      str.append(p.get(i).toString());
+    }
+    return str.toString();
+  }
+
+  /**
+   * Create a randomly named temporary directory
+   * 
+   * @return the file handle of the directory
+   * @throws IOException
+   */
+  public static File createTempDirectory() throws IOException {
+    File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
+        "mrunit-" + UUID.randomUUID().toString());
+    LOG.debug("Creating temp directory " + tmpDir);
+    tmpDir.mkdirs();
+    return tmpDir;
+  }
+
+  /**
+   * Extract an archive to the temp directory.
+   * Code borrowed from Hadoop's TrackerDistributedCacheManager
+   * 
+   * @param cacheArchive the cache archive to extract
+   * @param tmpDir root location of temp directory
+   * @return the path to the extracted archive
+   * @throws IOException
+   */
+  public static Path extractArchiveToTemp(Path cacheArchive, File tmpDir) throws IOException
{
+    String tmpArchive = cacheArchive.getName().toLowerCase();
+    File srcFile = new File(cacheArchive.toString());
+    File destDir = new File(tmpDir, srcFile.getName());
+    LOG.debug(String.format("Extracting %s to %s",
+             srcFile.toString(), destDir.toString()));
+    if (tmpArchive.endsWith(".jar")) {
+      RunJar.unJar(srcFile, destDir);
+    } else if (tmpArchive.endsWith(".zip")) {
+      FileUtil.unZip(srcFile, destDir);
+    } else if (isTarFile(tmpArchive)) {
+      FileUtil.unTar(srcFile, destDir);
+    } else {
+      LOG.warn(String.format(
+          "Cache file %s specified as archive, but not valid extension.",
+          srcFile.toString()));
+      return cacheArchive;
+    }
+    return new Path(destDir.toString());
+  }
+
+  private static boolean isTarFile(String filename) {
+    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
+           filename.endsWith(".tar"));
+  }
+
+}

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java Thu Aug 
9 18:53:21 2012
@@ -119,13 +119,15 @@ extends MapDriverBase<K1, V1, K2, V2, Ma
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-    preRunChecks(myMapper);
-
     try {
+      preRunChecks(myMapper);
+      initDistributedCache();
       myMapper.run(wrapper.getMockContext());
       return wrapper.getOutputs();
     } catch (final InterruptedException ie) {
       throw new IOException(ie);
+    } finally {
+      cleanupDistributedCache();
     }
   }
 

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Thu
Aug  9 18:53:21 2012
@@ -237,28 +237,29 @@ public class MapReduceDriver<K1, V1, K2,
 
   @Override
   public List<Pair<K3, V3>> run() throws IOException {
-    preRunChecks(myMapper,myReducer);
-
-    List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
-
-    // run map component
-    LOG.debug("Starting map phase with mapper: " + myMapper);
-    mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
-        .withCounters(getCounters()).withConfiguration(configuration)
-        .withAll(inputList).withMapInputPath(getMapInputPath()).run());      
-
-    if (myCombiner != null) {
-      // User has specified a combiner. Run this and replace the mapper outputs
-      // with the result of the combiner.
-      LOG.debug("Starting combine phase with combiner: " + myCombiner);
-      mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
-          shuffle(mapOutputs), myCombiner);
+    try {
+      preRunChecks(myMapper, myReducer);
+      initDistributedCache();
+      List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
+      // run map component
+      LOG.debug("Starting map phase with mapper: " + myMapper);
+      mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+          .withCounters(getCounters()).withConfiguration(configuration)
+          .withAll(inputList).withMapInputPath(getMapInputPath()).run());
+      if (myCombiner != null) {
+        // User has specified a combiner. Run this and replace the mapper outputs
+        // with the result of the combiner.
+        LOG.debug("Starting combine phase with combiner: " + myCombiner);
+        mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
+            shuffle(mapOutputs), myCombiner);
+      }
+      // Run the reduce phase.
+      LOG.debug("Starting reduce phase with reducer: " + myReducer);
+      return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs),
+          myReducer);
+    } finally {
+      cleanupDistributedCache();
     }
-
-    // Run the reduce phase.
-    LOG.debug("Starting reduce phase with reducer: " + myReducer);
-    return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs),
-        myReducer);
   }
 
   @Override

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=1371380&r1=1371379&r2=1371380&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Thu Aug
 9 18:53:21 2012
@@ -126,13 +126,15 @@ public class ReduceDriver<K1, V1, K2, V2
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-    preRunChecks(myReducer);
-
     try {
+      preRunChecks(myReducer);
+      initDistributedCache();
       myReducer.run(wrapper.getMockContext());
       return wrapper.getOutputs();
     } catch (final InterruptedException ie) {
       throw new IOException(ie);
+    } finally {
+      cleanupDistributedCache();
     }
   }
 

Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestDistributedCache.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestDistributedCache.java?rev=1371380&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestDistributedCache.java (added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestDistributedCache.java Thu Aug
 9 18:53:21 2012
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDistributedCache {
+
+  private static final Text DUMMY = new Text("DUMMY");
+
+  private Mapper<Text,Text,Text,Text> mapper = new TestDistributedCacheMapperAndReducer();
+  private Reducer<Text,Text,Text,Text> reducer = new TestDistributedCacheMapperAndReducer();
+
+  private MapDriver<Text,Text,Text,Text> mapDriver = 
+      MapDriver.newMapDriver(mapper);
+  private ReduceDriver<Text,Text,Text,Text> reduceDriver = 
+      ReduceDriver.newReduceDriver(reducer);
+  private MapReduceDriver<Text,Text,Text,Text,Text,Text> mapReduceDriver = 
+      MapReduceDriver.newMapReduceDriver();
+
+  /**
+   * A dual Mapper and Reducer class which loads files / archives from distributed
+   * cache and outputs the filenames as keys, and whether the cache item is a file
+   * or directory ("file" or "dir") as value
+   */
+  private static class TestDistributedCacheMapperAndReducer extends MapReduceBase 
+    implements Mapper<Text,Text,Text,Text>, Reducer<Text,Text,Text,Text> {
+
+    private static final Text DIR = new Text("dir");
+    private static final Text FILE = new Text("file");
+
+    private List<Path> cachePaths = new ArrayList<Path>();
+    private FileSystem fs;
+    private Text outputKey = new Text();
+
+    @Override
+    public void configure(JobConf job) {
+      try {
+        fs = FileSystem.get(job);
+        cachePaths.clear();
+        Path[] localCacheArchives = DistributedCache.getLocalCacheArchives(job);
+        if (localCacheArchives != null) {
+          cachePaths.addAll(Arrays.asList(localCacheArchives));
+        }
+        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(job);
+        if (localCacheFiles != null) {
+          cachePaths.addAll(Arrays.asList(localCacheFiles));
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void map(Text key, Text value,
+        OutputCollector<Text, Text> output, Reporter reporter)
+        throws IOException {
+      outputCachePaths(output, reporter);
+    }
+
+    @Override
+    public void reduce(Text key, Iterator<Text> values,
+        OutputCollector<Text, Text> output, Reporter reporter)
+        throws IOException {
+      outputCachePaths(output, reporter);
+    }
+
+    private void outputCachePaths(OutputCollector<Text, Text> output,
+        Reporter reporter) throws IOException {
+      for (Path path: cachePaths) {
+        outputPath("", path, output, reporter);
+      }
+    }
+
+    private void outputPath(String parentPath, Path path, 
+        OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+      FileStatus fstat = fs.getFileStatus(path);
+      boolean isDir = fstat.isDir();
+      if (parentPath.length() > 0) {
+        //append parent name if the file was extracted from an archive
+        outputKey.set(parentPath + "/" + path.getName());
+      } else {
+        outputKey.set(path.getName());
+      }
+      // output the file info
+      output.collect(outputKey, isDir ? DIR : FILE);
+      // recurse into extracted directories
+      if (isDir) {
+        for (FileStatus subStat: fs.listStatus(path)) {
+          outputPath(path.getName(), subStat.getPath(), output, reporter);
+        }
+      }
+    }
+  }
+
+  @Before
+  public void setup() {
+    // all drivers need dummy input to drive the tests
+    mapDriver.withInput(DUMMY, DUMMY);
+    reduceDriver.withInput(DUMMY, Arrays.asList(DUMMY,DUMMY,DUMMY));
+    mapReduceDriver.withInput(DUMMY, DUMMY);
+  }
+
+  @Test
+  public void testAddCacheFileToMapperUsingDriverMethod() throws IOException
+  {
+    mapDriver.withCacheFile("testfile")
+      .withOutput(new Text("testfile"), new Text("file")).runTest();
+  }
+
+  @Test
+  public void testAddCacheFileToMapperUsingStaticMethod() throws Exception
+  {
+    Configuration conf = new Configuration();
+    DistributedCache.addCacheFile(new File("README.txt").toURI(), conf);
+    mapDriver.withConfiguration(conf)
+      .withOutput(new Text("README.txt"), new Text("file")).runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveToMapperUsingDriverMethod() throws IOException
+  {
+    // Cache archives should be extracted into a directory named the same
+    // as the original file
+    mapDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveWithInvalidExtension() throws IOException
+  {
+    // Cache archives with a non-archive extension should be passed
+    // through without being expanded
+    mapDriver.withCacheArchive("testarchive.tar.abc")
+      .withOutput(new Text("testarchive.tar.abc"), new Text("file"))
+      .runTest();
+  }
+
+  @Test
+  public void testAddCacheFileToReducerUsingDriverMethod() throws IOException
+  {
+    reduceDriver.withCacheFile("testfile")
+      .withOutput(new Text("testfile"), new Text("file")).runTest();
+  }
+  
+  @Test
+  public void testAddCacheFileToReducerUsingStaticMethod() throws Exception
+  {
+    Configuration conf = new Configuration();
+    DistributedCache.addCacheFile(new File("README.txt").toURI(), conf);
+    reduceDriver.withConfiguration(conf)
+      .withOutput(new Text("README.txt"), new Text("file")).runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveToReducerUsingDriverMethod() throws IOException
+  {
+    reduceDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveToMapReduceUsingDriverMethod() throws IOException
+  {
+    // Tests that dist cache files are correctly processed by mapper
+    // as part of the mapreduce driver pipeline
+    mapReduceDriver.setMapper(mapper);
+    mapReduceDriver.setReducer(new IdentityReducer<Text,Text>());
+    mapReduceDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+  
+  @Test
+  public void testAddCacheArchiveToMapReduceUsingDriverMethod2() throws IOException
+  {
+    // Tests that dist cache files are correctly processed by reducer
+    // as part of the mapreduce driver pipeline
+    mapReduceDriver.setMapper(new IdentityMapper<Text,Text>());
+    mapReduceDriver.setReducer(reducer);
+    mapReduceDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+
+}

Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java?rev=1371380&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
(added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
Thu Aug  9 18:53:21 2012
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDistributedCache {
+
+  private static final Text DUMMY = new Text("DUMMY");
+
+  private Mapper<Text,Text,Text,Text> mapper = new TestDistributedCacheMapper();
+  private Reducer<Text,Text,Text,Text> reducer = new TestDistributedCacheReducer();
+
+  private MapDriver<Text,Text,Text,Text> mapDriver = 
+      MapDriver.newMapDriver(mapper);
+  private ReduceDriver<Text,Text,Text,Text> reduceDriver = 
+      ReduceDriver.newReduceDriver(reducer);
+  private MapReduceDriver<Text,Text,Text,Text,Text,Text> mapReduceDriver = 
+      MapReduceDriver.newMapReduceDriver();
+
+  static final Text DIR = new Text("dir");
+  static final Text FILE = new Text("file");
+  
+  /**
+   * A mapper class which loads files / archives from distributed
+   * cache and outputs the filenames as keys, and whether the cache item is a file
+   * or directory ("file" or "dir") as value
+   */
+  private static class TestDistributedCacheMapper  
+    extends Mapper<Text,Text,Text,Text> {
+
+    private List<Path> cachePaths;
+
+    protected void setup(Context context) 
+        throws IOException, InterruptedException {
+      cachePaths = TestDistributedCacheUtils.createCachePathList(context);
+    }
+
+    @Override
+    public void map(Text key, Text value, Context context)
+        throws IOException, InterruptedException {
+      TestDistributedCacheUtils.outputCachePaths(cachePaths, context);
+    }
+  }
+
+  /**
+   * A reducer class which loads files / archives from distributed
+   * cache and outputs the filenames as keys, and whether the cache item is a file
+   * or directory ("file" or "dir") as value
+   */
+  private static class TestDistributedCacheReducer  
+    extends Reducer<Text,Text,Text,Text> {
+  
+    private List<Path> cachePaths;
+  
+    protected void setup(Context context) 
+        throws IOException, InterruptedException {
+      cachePaths = TestDistributedCacheUtils.createCachePathList(context);
+    }
+  
+    @Override
+    public void reduce(Text key, Iterable<Text> value, Context context)
+        throws IOException, InterruptedException {
+      TestDistributedCacheUtils.outputCachePaths(cachePaths, context);
+    }
+  }
+
+  private static class TestDistributedCacheUtils {
+
+    private static List<Path> createCachePathList(
+        TaskInputOutputContext<Text,Text,Text,Text> context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      List<Path> cachePaths = new ArrayList<Path>();
+      Path[] localCacheArchives = DistributedCache.getLocalCacheArchives(conf);
+      if (localCacheArchives != null) {
+        cachePaths.addAll(Arrays.asList(localCacheArchives));
+      }
+      Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
+      if (localCacheFiles != null) {
+        cachePaths.addAll(Arrays.asList(localCacheFiles));
+      }
+      return cachePaths;
+    }
+
+    private static void outputCachePaths(List<Path> cachePaths, 
+        TaskInputOutputContext<Text,Text,Text,Text> context) 
+        throws IOException, InterruptedException {
+      for (Path path: cachePaths) {
+        outputPath("", path, context);
+      }
+    }
+
+    private static void outputPath(String parentPath, Path path, 
+        TaskInputOutputContext<Text,Text,Text,Text> context) 
+            throws IOException, InterruptedException {
+      FileSystem fs = FileSystem.get(context.getConfiguration());
+      FileStatus fstat = fs.getFileStatus(path);
+      Text outputKey = new Text();
+      boolean isDir = fstat.isDir();
+      if (parentPath.length() > 0) {
+        //append parent name if the file was extracted from an archive
+        outputKey.set(parentPath + "/" + path.getName());
+      } else {
+        outputKey.set(path.getName());
+      }
+      // output the file info
+      context.write(outputKey, isDir ? DIR : FILE);
+      // recurse into extracted directories
+      if (isDir) {
+        for (FileStatus subStat: fs.listStatus(path)) {
+          outputPath(path.getName(), subStat.getPath(), context);
+        }
+      }
+    }
+  }
+
+  @Before
+  public void setup() {
+    // all drivers need dummy input to drive the tests
+    mapDriver.withInput(DUMMY, DUMMY);
+    reduceDriver.withInput(DUMMY, Arrays.asList(DUMMY,DUMMY,DUMMY));
+    mapReduceDriver.withInput(DUMMY, DUMMY);
+  }
+
+  @Test
+  public void testAddCacheFileToMapperUsingDriverMethod() throws IOException
+  {
+    mapDriver.withCacheFile("testfile")
+      .withOutput(new Text("testfile"), new Text("file")).runTest();
+  }
+
+  @Test
+  public void testAddCacheFileToMapperUsingStaticMethod() throws Exception
+  {
+    Configuration conf = new Configuration();
+    DistributedCache.addCacheFile(new File("README.txt").toURI(), conf);
+    mapDriver.withConfiguration(conf)
+      .withOutput(new Text("README.txt"), new Text("file")).runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveToMapperUsingDriverMethod() throws IOException
+  {
+    // Cache archives should be extracted into a directory named the same
+    // as the original file
+    mapDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveWithInvalidExtension() throws IOException
+  {
+    // Cache archives with a non-archive extension should be passed
+    // through without being expanded
+    mapDriver.withCacheArchive("testarchive.tar.abc")
+      .withOutput(new Text("testarchive.tar.abc"), new Text("file"))
+      .runTest();
+  }
+
+  @Test
+  public void testAddCacheFileToReducerUsingDriverMethod() throws IOException
+  {
+    reduceDriver.withCacheFile("testfile")
+      .withOutput(new Text("testfile"), new Text("file")).runTest();
+  }
+  
+  @Test
+  public void testAddCacheFileToReducerUsingStaticMethod() throws Exception
+  {
+    Configuration conf = new Configuration();
+    DistributedCache.addCacheFile(new File("README.txt").toURI(), conf);
+    reduceDriver.withConfiguration(conf)
+      .withOutput(new Text("README.txt"), new Text("file")).runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveToReducerUsingDriverMethod() throws IOException
+  {
+    reduceDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+
+  @Test
+  public void testAddCacheArchiveToMapReduceUsingDriverMethod() throws IOException
+  {
+    // Tests that dist cache files are correctly processed by mapper
+    // as part of the mapreduce driver pipeline
+    mapReduceDriver.setMapper(mapper);
+    mapReduceDriver.setReducer(new Reducer<Text,Text,Text,Text>());
+    mapReduceDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+  
+  @Test
+  public void testAddCacheArchiveToMapReduceUsingDriverMethod2() throws IOException
+  {
+    // Tests that dist cache files are correctly processed by reducer
+    // as part of the mapreduce driver pipeline
+    mapReduceDriver.setMapper(new Mapper<Text,Text,Text,Text>());
+    mapReduceDriver.setReducer(reducer);
+    mapReduceDriver.withCacheArchive("testarchive.tar")
+      .withOutput(new Text("testarchive.tar"), new Text("dir"))
+      .withOutput(new Text("testarchive.tar/a"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/b"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/c"), new Text("file"))
+      .withOutput(new Text("testarchive.tar/d"), new Text("file"))
+      .runTest();
+  }
+
+}

Added: mrunit/trunk/src/test/resources/testarchive.tar
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/resources/testarchive.tar?rev=1371380&view=auto
==============================================================================
Binary file - no diff available.

Propchange: mrunit/trunk/src/test/resources/testarchive.tar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: mrunit/trunk/src/test/resources/testarchive.tar.abc
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/resources/testarchive.tar.abc?rev=1371380&view=auto
==============================================================================
Binary file - no diff available.

Propchange: mrunit/trunk/src/test/resources/testarchive.tar.abc
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: mrunit/trunk/src/test/resources/testfile
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/resources/testfile?rev=1371380&view=auto
==============================================================================
--- mrunit/trunk/src/test/resources/testfile (added)
+++ mrunit/trunk/src/test/resources/testfile Thu Aug  9 18:53:21 2012
@@ -0,0 +1 @@
+hello
\ No newline at end of file



Mime
View raw message