incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [3/4] git commit: CRUNCH-24 : make testsuite sutable for CI
Date Thu, 26 Jul 2012 05:48:26 GMT
CRUNCH-24 : make testsuite sutable for CI

- Used TemporaryPath for setting crunch.tmp.dir in all IT
- Modified MRPipeline setConfig method, to refresh tempDirectory
- UnionCollectionIT :create pipeline in @Before and set crunch.tmp.dir using TemporaryFolder

Signed-off-by: jwills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/627bb42d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/627bb42d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/627bb42d

Branch: refs/heads/master
Commit: 627bb42d6a23d3923c0d432478f170315a7df026
Parents: 6f549ed
Author: Rahul Sharma <rsharma@xebia.com>
Authored: Sun Jul 22 17:22:55 2012 +0530
Committer: jwills <jwills@apache.org>
Committed: Wed Jul 25 22:23:38 2012 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CollectionsIT.java   |   12 ++++-
 crunch/src/it/java/org/apache/crunch/MapsIT.java   |   16 ++++--
 .../it/java/org/apache/crunch/MaterializeIT.java   |   18 +++++--
 .../java/org/apache/crunch/MaterializeToMapIT.java |    7 ++-
 .../java/org/apache/crunch/MultipleOutputIT.java   |    5 +-
 .../org/apache/crunch/PCollectionGetSizeIT.java    |   13 +++--
 .../java/org/apache/crunch/PTableKeyValueIT.java   |    7 ++-
 .../src/it/java/org/apache/crunch/PageRankIT.java  |   12 +++-
 .../it/java/org/apache/crunch/TermFrequencyIT.java |    5 +-
 .../src/it/java/org/apache/crunch/TextPairIT.java  |    7 ++-
 crunch/src/it/java/org/apache/crunch/TfIdfIT.java  |    5 +-
 .../org/apache/crunch/TupleNClassCastBugIT.java    |    5 +-
 .../src/it/java/org/apache/crunch/WordCountIT.java |   13 +++--
 .../crunch/impl/mr/collect/UnionCollectionIT.java  |   29 +++++-----
 .../crunch/io/avro/AvroFileSourceTargetIT.java     |   11 +++-
 .../org/apache/crunch/io/avro/AvroReflectIT.java   |    7 ++-
 .../it/java/org/apache/crunch/lib/AggregateIT.java |   15 ++++--
 .../java/org/apache/crunch/lib/AvroTypeSortIT.java |    7 ++-
 .../it/java/org/apache/crunch/lib/CogroupIT.java   |    5 +-
 .../src/it/java/org/apache/crunch/lib/SetIT.java   |    8 +++-
 .../src/it/java/org/apache/crunch/lib/SortIT.java  |   41 ++++++++------
 .../apache/crunch/lib/SpecificAvroGroupByIT.java   |   10 +++-
 .../org/apache/crunch/lib/join/JoinTester.java     |    9 +++-
 .../org/apache/crunch/lib/join/MapsideJoinIT.java  |    9 +++-
 .../crunch/lib/join/MultiAvroSchemaJoinIT.java     |    7 ++-
 .../java/org/apache/crunch/test/TemporaryPath.java |   42 ++++++++-------
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |    3 +-
 27 files changed, 219 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
index c6a9c77..e6be525 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
@@ -24,11 +24,16 @@ import java.util.Collection;
 
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -72,14 +77,17 @@ public class CollectionsIT {
         .combineValues(CombineFn.<String, Collection<String>> aggregator(new AggregateStringListFn()));
   }
 
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
+
   @Test
   public void testWritables() throws IOException {
-    run(new MRPipeline(CollectionsIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(CollectionsIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvro() throws IOException {
-    run(new MRPipeline(CollectionsIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(CollectionsIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/MapsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MapsIT.java b/crunch/src/it/java/org/apache/crunch/MapsIT.java
index b1c1298..e0911c9 100644
--- a/crunch/src/it/java/org/apache/crunch/MapsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MapsIT.java
@@ -20,29 +20,37 @@ package org.apache.crunch;
 import java.util.Map;
 
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 public class MapsIT {
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Test
   public void testWritables() throws Exception {
-    run(WritableTypeFamily.getInstance());
+    run(WritableTypeFamily.getInstance(), temporaryFolder.newFolder().getAbsolutePath());
   }
 
   @Test
   public void testAvros() throws Exception {
-    run(AvroTypeFamily.getInstance());
+    run(AvroTypeFamily.getInstance(), temporaryFolder.newFolder().getAbsolutePath());
   }
 
-  public static void run(PTypeFamily typeFamily) throws Exception {
-    Pipeline pipeline = new MRPipeline(MapsIT.class);
+  public static void run(PTypeFamily typeFamily, String path) throws Exception {
+    Configuration config = new Configuration();
+    config.set(RuntimeParameters.TMP_DIR, path);
+    Pipeline pipeline = new MRPipeline(MapsIT.class, config);
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
     Iterable<Pair<String, Map<String, Long>>> output = shakespeare

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
index 3f30e70..2e12578 100644
--- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -26,9 +26,12 @@ import java.util.List;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -45,14 +48,19 @@ public class MaterializeIT {
     }
   }
 
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
+
   @Test
   public void testMaterializeInput_Writables() throws IOException {
-    runMaterializeInput(new MRPipeline(MaterializeIT.class), WritableTypeFamily.getInstance());
+    runMaterializeInput(new MRPipeline(MaterializeIT.class, temporaryPath.setTempLoc(new Configuration())),
+        WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testMaterializeInput_Avro() throws IOException {
-    runMaterializeInput(new MRPipeline(MaterializeIT.class), AvroTypeFamily.getInstance());
+    runMaterializeInput(new MRPipeline(MaterializeIT.class, temporaryPath.setTempLoc(new Configuration())),
+        AvroTypeFamily.getInstance());
   }
 
   @Test
@@ -67,12 +75,14 @@ public class MaterializeIT {
 
   @Test
   public void testMaterializeEmptyIntermediate_Writables() throws IOException {
-    runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class), WritableTypeFamily.getInstance());
+    runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class, temporaryPath.setTempLoc(new Configuration())),
+        WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testMaterializeEmptyIntermediate_Avro() throws IOException {
-    runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class), AvroTypeFamily.getInstance());
+    runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class, temporaryPath.setTempLoc(new Configuration())),
+        AvroTypeFamily.getInstance());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
index db7449a..8177c1c 100644
--- a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
@@ -25,7 +25,10 @@ import java.util.Map;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -63,10 +66,12 @@ public class MaterializeToMapIT {
       return Pair.of(k, input);
     }
   }
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test
   public void testMRMaterializeToMap() throws IOException {
-    Pipeline p = new MRPipeline(MaterializeToMapIT.class);
+    Pipeline p = new MRPipeline(MaterializeToMapIT.class, temporaryPath.setTempLoc(new Configuration()));
     String inputFile = FileHelper.createTempCopyOf("set1.txt");
     PCollection<String> c = p.readTextFile(inputFile);
     PTypeFamily tf = c.getTypeFamily();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
index 90925ba..a523505 100644
--- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -31,6 +31,7 @@ import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -73,12 +74,12 @@ public class MultipleOutputIT {
 
   @Test
   public void testWritables() throws IOException {
-    run(new MRPipeline(MultipleOutputIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(MultipleOutputIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvro() throws IOException {
-    run(new MRPipeline(MultipleOutputIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(MultipleOutputIT.class, tmpDir.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
index 4ff8490..6168967 100644
--- a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -61,7 +62,7 @@ public class PCollectionGetSizeIT {
 
   @Test
   public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
-    testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass()));
+    testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass(), tmpDir.setTempLoc(new Configuration())));
   }
 
   @Test
@@ -76,7 +77,7 @@ public class PCollectionGetSizeIT {
 
   @Test
   public void testMaterializeEmptyInput_MRPipeline() throws IOException {
-    testMaterializeEmptyInput(new MRPipeline(this.getClass()));
+    testMaterializeEmptyInput(new MRPipeline(this.getClass(), tmpDir.setTempLoc(new Configuration())));
   }
 
   @Test
@@ -91,7 +92,7 @@ public class PCollectionGetSizeIT {
   @Test
   public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
 
-    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass(), tmpDir.setTempLoc(new Configuration())));
 
     assertThat(emptyIntermediate.getSize(), is(0L));
   }
@@ -100,7 +101,7 @@ public class PCollectionGetSizeIT {
   @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
   public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
 
-    PCollection<String> data = new MRPipeline(this.getClass()).readTextFile(nonEmptyInputPath);
+    PCollection<String> data = new MRPipeline(this.getClass(), tmpDir.setTempLoc(new Configuration())).readTextFile(nonEmptyInputPath);
 
     PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
 
@@ -118,7 +119,7 @@ public class PCollectionGetSizeIT {
   @Test
   public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
 
-    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass(), tmpDir.setTempLoc(new Configuration())));
 
     assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
   }
@@ -146,7 +147,7 @@ public class PCollectionGetSizeIT {
 
   @Test(expected = IllegalStateException.class)
   public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
-    new MRPipeline(this.getClass()).readTextFile("non_existing.file").getSize();
+    new MRPipeline(this.getClass(), tmpDir.setTempLoc(new Configuration())).readTextFile("non_existing.file").getSize();
   }
 
   @Test(expected = IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
index de370b0..94d2792 100644
--- a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
@@ -28,11 +28,14 @@ import junit.framework.Assert;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -48,10 +51,12 @@ public class PTableKeyValueIT implements Serializable {
   private transient PTypeFamily typeFamily;
   private transient MRPipeline pipeline;
   private transient String inputFile;
+  @Rule
+  public transient TemporaryPath temporaryPath = new TemporaryPath();
 
   @Before
   public void setUp() throws IOException {
-    pipeline = new MRPipeline(PTableKeyValueIT.class);
+    pipeline = new MRPipeline(PTableKeyValueIT.class, temporaryPath.setTempLoc(new Configuration()));
     inputFile = FileHelper.createTempCopyOf("set1.txt");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
index 36f1c35..7791936 100644
--- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
@@ -26,12 +26,15 @@ import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.util.PTypes;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
@@ -67,11 +70,14 @@ public class PageRankIT {
     }
   }
 
+  @Rule
+  public TemporaryPath temporaryPath = new TemporaryPath();
+
   @Test
   public void testAvroReflect() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(new MRPipeline(PageRankIT.class), prType, tf);
+    run(new MRPipeline(PageRankIT.class, temporaryPath.setTempLoc(new Configuration())), prType, tf);
   }
 
   @Test
@@ -85,14 +91,14 @@ public class PageRankIT {
   public void testAvroJSON() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    run(new MRPipeline(PageRankIT.class), prType, tf);
+    run(new MRPipeline(PageRankIT.class, temporaryPath.setTempLoc(new Configuration())), prType, tf);
   }
 
   @Test
   public void testWritablesJSON() throws Exception {
     PTypeFamily tf = WritableTypeFamily.getInstance();
     PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    run(new MRPipeline(PageRankIT.class), prType, tf);
+    run(new MRPipeline(PageRankIT.class, temporaryPath.setTempLoc(new Configuration())), prType, tf);
   }
 
   public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
index ee2267d..8e52d03 100644
--- a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
@@ -32,6 +32,7 @@ import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -42,12 +43,12 @@ public class TermFrequencyIT implements Serializable {
 
   @Test
   public void testTermFrequencyWithNoTransform() throws IOException {
-    run(new MRPipeline(TermFrequencyIT.class), WritableTypeFamily.getInstance(), false);
+    run(new MRPipeline(TermFrequencyIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), false);
   }
 
   @Test
   public void testTermFrequencyWithTransform() throws IOException {
-    run(new MRPipeline(TermFrequencyIT.class), WritableTypeFamily.getInstance(), true);
+    run(new MRPipeline(TermFrequencyIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), true);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/TextPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TextPairIT.java b/crunch/src/it/java/org/apache/crunch/TextPairIT.java
index 7694bad..3b3a239 100644
--- a/crunch/src/it/java/org/apache/crunch/TextPairIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TextPairIT.java
@@ -24,14 +24,19 @@ import java.io.IOException;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 public class TextPairIT {
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test
   public void testWritables() throws IOException {
-    run(new MRPipeline(TextPairIT.class));
+    run(new MRPipeline(TextPairIT.class, temporaryPath.setTempLoc(new Configuration())));
   }
 
   private static final String CANARY = "Writables.STRING_TO_TEXT";

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
index de1d5dc..411985c 100644
--- a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -34,6 +34,7 @@ import org.apache.crunch.lib.Join;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.Rule;
 import org.junit.Test;
@@ -51,12 +52,12 @@ public class TfIdfIT implements Serializable {
 
   @Test
   public void testWritablesSingleRun() throws IOException {
-    run(new MRPipeline(TfIdfIT.class), WritableTypeFamily.getInstance(), true);
+    run(new MRPipeline(TfIdfIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), true);
   }
 
   @Test
   public void testWritablesMultiRun() throws IOException {
-    run(new MRPipeline(TfIdfIT.class), WritableTypeFamily.getInstance(), false);
+    run(new MRPipeline(TfIdfIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java b/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
index f845214..48a8ef6 100644
--- a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
@@ -29,6 +29,7 @@ import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -62,12 +63,12 @@ public class TupleNClassCastBugIT {
 
   @Test
   public void testWritables() throws IOException {
-    run(new MRPipeline(TupleNClassCastBugIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvro() throws IOException {
-    run(new MRPipeline(TupleNClassCastBugIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountIT.java b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
index 5ba931f..fe0c98d 100644
--- a/crunch/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
@@ -33,6 +33,7 @@ import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -79,31 +80,31 @@ public class WordCountIT {
 
   @Test
   public void testWritables() throws IOException {
-    run(new MRPipeline(WordCountIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(WordCountIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testWritablesWithSecond() throws IOException {
     runSecond = true;
-    run(new MRPipeline(WordCountIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(WordCountIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testWritablesWithSecondUseToOutput() throws IOException {
     runSecond = true;
     useToOutput = true;
-    run(new MRPipeline(WordCountIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(WordCountIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvro() throws IOException {
-    run(new MRPipeline(WordCountIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(WordCountIT.class, tmpDir.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   @Test
   public void testAvroWithSecond() throws IOException {
     runSecond = true;
-    run(new MRPipeline(WordCountIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(WordCountIT.class, tmpDir.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   @Test
@@ -117,7 +118,7 @@ public class WordCountIT {
   }
 
   public void runWithTop(PTypeFamily tf) throws IOException {
-    Pipeline pipeline = new MRPipeline(WordCountIT.class);
+    Pipeline pipeline = new MRPipeline(WordCountIT.class, tmpDir.setTempLoc(new Configuration()));
     String inputPath = tmpDir.copyResourceFileName("shakes.txt");
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
index 9748143..56dc10d 100644
--- a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -41,6 +41,7 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -64,12 +65,18 @@ public class UnionCollectionIT {
 
   private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
 
+  private Class pipelineClass;
+
   @Before
   @SuppressWarnings("unchecked")
   public void setUp() throws IOException {
     String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
     String inputFile2 = tmpDir.copyResourceFileName("set2.txt");
-
+    if (pipelineClass == null) {
+      pipeline = MemPipeline.getInstance();
+    } else {
+      pipeline = new MRPipeline(pipelineClass, tmpDir.setTempLoc(new Configuration()));
+    }
     PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
     PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
 
@@ -80,27 +87,21 @@ public class UnionCollectionIT {
     union = secondCollection.union(firstCollection);
   }
 
-  @After
-  public void tearDown() {
-    pipeline.done();
-  }
-
   @Parameters
   public static Collection<Object[]> data() throws IOException {
-    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
-        { WritableTypeFamily.getInstance(), MemPipeline.getInstance() },
-        { AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
-        { AvroTypeFamily.getInstance(), MemPipeline.getInstance() } };
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), PTableKeyValueIT.class },
+        { WritableTypeFamily.getInstance(), null }, { AvroTypeFamily.getInstance(), PTableKeyValueIT.class },
+        { AvroTypeFamily.getInstance(), null } };
     return Arrays.asList(data);
   }
 
-  public UnionCollectionIT(PTypeFamily typeFamily, Pipeline pipeline) {
+  public UnionCollectionIT(PTypeFamily typeFamily, Class pipelineClass) {
     this.typeFamily = typeFamily;
-    this.pipeline = pipeline;
+    this.pipelineClass = pipelineClass;
   }
 
   @Test
-  public void unionMaterializeShouldNotThrowNPE() {
+  public void unionMaterializeShouldNotThrowNPE() throws Exception {
     checkMaterialized(union.materialize());
     checkMaterialized(pipeline.materialize(union));
   }
@@ -116,7 +117,6 @@ public class UnionCollectionIT {
 
   @Test
   public void unionWriteShouldNotThrowNPE() throws IOException {
-
     String outputPath1 = tmpDir.getFileName("output1");
     String outputPath2 = tmpDir.getFileName("output2");
     String outputPath3 = tmpDir.getFileName("output3");
@@ -142,7 +142,6 @@ public class UnionCollectionIT {
       checkFileContents(outputPath2);
       checkFileContents(outputPath3);
     }
-
   }
 
   private void checkFileContents(String filePath) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
index 603086c..f82cf44 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -38,9 +38,12 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
 import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -49,6 +52,8 @@ import com.google.common.collect.Lists;
 public class AvroFileSourceTargetIT implements Serializable {
 
   private transient File avroFile;
+  @Rule
+  public transient TemporaryPath temporaryPath= new TemporaryPath();
 
   @Before
   public void setUp() throws IOException {
@@ -84,7 +89,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
 
-    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, temporaryPath.setTempLoc(new Configuration()));
     PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
         Avros.records(Person.class)));
 
@@ -112,7 +117,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
 
-    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, temporaryPath.setTempLoc(new Configuration()));
     PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
         Avros.generics(genericPersonSchema)));
 
@@ -128,7 +133,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     savedRecord.put("name", "John Doe");
     populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
 
-    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, temporaryPath.setTempLoc(new Configuration()));
     PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
         Avros.reflects(PojoPerson.class)));
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
index 2c6cbd7..94c4223 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -28,7 +28,10 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -85,10 +88,12 @@ public class AvroReflectIT implements Serializable {
     }
 
   }
+  @Rule
+  public transient TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test
   public void testReflection() throws IOException {
-    Pipeline pipeline = new MRPipeline(AvroReflectIT.class);
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, temporaryPath.setTempLoc(new Configuration()));
     PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
         .parallelDo(new MapFn<String, StringWrapper>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
index dfb296e..380df33 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -35,13 +35,16 @@ import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.Employee;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -49,10 +52,12 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 public class AggregateIT {
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test
   public void testWritables() throws Exception {
-    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, temporaryPath.setTempLoc(new Configuration()));
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
     runMinMax(shakes, WritableTypeFamily.getInstance());
@@ -61,7 +66,7 @@ public class AggregateIT {
 
   @Test
   public void testAvro() throws Exception {
-    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, temporaryPath.setTempLoc(new Configuration()));
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
     runMinMax(shakes, AvroTypeFamily.getInstance());
@@ -104,7 +109,7 @@ public class AggregateIT {
 
   @Test
   public void testCollectUrls() throws Exception {
-    Pipeline p = new MRPipeline(AggregateIT.class);
+    Pipeline p = new MRPipeline(AggregateIT.class, temporaryPath.setTempLoc(new Configuration()));
     String urlsInputPath = FileHelper.createTempCopyOf("urls.txt");
     PTable<String, Collection<String>> urls = Aggregate.collectValues(p.readTextFile(urlsInputPath).parallelDo(
         new SplitFn(), tableOf(strings(), strings())));
@@ -137,7 +142,7 @@ public class AggregateIT {
 
   @Test
   public void testCollectValues_Writables() throws IOException {
-    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, temporaryPath.setTempLoc(new Configuration()));
     Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt"))
         .parallelDo(new MapStringToTextPair(), Writables.tableOf(Writables.ints(), Writables.writables(Text.class)))
         .collectValues().materializeToMap();
@@ -151,7 +156,7 @@ public class AggregateIT {
   public void testCollectValues_Avro() throws IOException {
 
     MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
-    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, temporaryPath.setTempLoc(new Configuration()));
     Map<Integer, Collection<Employee>> collectionMap = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt"))
         .parallelDo(mapFn, Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
         .materializeToMap();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
index 9f1e35c..8ee4af7 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
@@ -35,8 +35,11 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -49,6 +52,8 @@ public class AvroTypeSortIT implements Serializable {
   private static final long serialVersionUID = 1344118240353796561L;
 
   private transient File avroFile;
+  @Rule
+  public transient TemporaryPath temporaryPath= new TemporaryPath();
 
   @Before
   public void setUp() throws IOException {
@@ -63,7 +68,7 @@ public class AvroTypeSortIT implements Serializable {
   @Test
   public void testSortAvroTypesBySelectedFields() throws Exception {
 
-    MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class);
+    MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class, temporaryPath.setTempLoc(new Configuration()));
 
     Person ccc10 = createPerson("CCC", 10);
     Person bbb20 = createPerson("BBB", 20);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 4f65964..ae952d3 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -41,6 +41,7 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -93,12 +94,12 @@ public class CogroupIT {
 
   @Test
   public void testWritableJoin() throws Exception {
-    run(new MRPipeline(CogroupIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(CogroupIT.class, tmpDir.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvroJoin() throws Exception {
-    run(new MRPipeline(CogroupIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(CogroupIT.class, tmpDir.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
index 4b07fa2..bde3788 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
@@ -31,11 +31,14 @@ import org.apache.crunch.Tuple3;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -55,6 +58,9 @@ public class SetIT {
   public SetIT(PTypeFamily typeFamily) {
     this.typeFamily = typeFamily;
   }
+  
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
 
   @Parameters
   public static Collection<Object[]> data() {
@@ -66,7 +72,7 @@ public class SetIT {
   public void setUp() throws IOException {
     String set1InputPath = FileHelper.createTempCopyOf("set1.txt");
     String set2InputPath = FileHelper.createTempCopyOf("set2.txt");
-    pipeline = new MRPipeline(SetIT.class);
+    pipeline = new MRPipeline(SetIT.class, temporaryPath.setTempLoc(new Configuration()));
     set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
     set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index a0357c1..161adfc 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -43,109 +43,114 @@ import org.apache.crunch.lib.Sort.ColumnOrder;
 import org.apache.crunch.lib.Sort.Order;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
 public class SortIT implements Serializable {
+  @Rule
+  public transient TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test
   public void testWritableSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), Order.ASCENDING,
+    runSingle(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), Order.ASCENDING,
         "A\tand this text as well");
   }
 
   @Test
   public void testWritableSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), Order.DESCENDING,
+    runSingle(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), Order.DESCENDING,
         "B\tthis doc has some text");
   }
 
   @Test
   public void testWritableSortAscDesc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+    runPair(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
         "this doc has this text");
   }
 
   @Test
   public void testWritableSortSecondDescFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+    runPair(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
         "this doc has this text");
   }
 
   @Test
   public void testWritableSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+    runTriple(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
         by(3, ASCENDING), "A", "this", "doc");
   }
 
   @Test
   public void testWritableSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+    runQuad(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
         by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
   }
 
   @Test
   public void testWritableSortTupleNAscDesc() throws Exception {
-    runTupleN(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(),
+    runTupleN(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(),
         new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
   }
 
   @Test
   public void testWritableSortTable() throws Exception {
-    runTable(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), "A");
+    runTable(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance(), "A");
   }
 
   @Test
   public void testAvroSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.ASCENDING, "A\tand this text as well");
+    runSingle(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), Order.ASCENDING, "A\tand this text as well");
   }
 
   @Test
   public void testAvroSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.DESCENDING, "B\tthis doc has some text");
+    runSingle(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), Order.DESCENDING, "B\tthis doc has some text");
   }
 
   @Test
   public void testAvroSortPairAscAsc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+    runPair(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
         "this doc has this text");
   }
 
   @Test
   @Ignore("Avro sorting only works in field order at the moment")
   public void testAvroSortPairSecondAscFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+    runPair(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
         "this doc has this text");
   }
 
   @Test
   public void testAvroSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+    runTriple(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
         by(3, ASCENDING), "A", "this", "doc");
   }
 
   @Test
   public void testAvroSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+    runQuad(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
         by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
   }
 
   @Test
   public void testAvroSortTupleNAscDesc() throws Exception {
-    runTupleN(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(),
+    runTupleN(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(),
         new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
   }
 
   @Test
   public void testAvroReflectSortPair() throws IOException {
-    Pipeline pipeline = new MRPipeline(SortIT.class);
+    Pipeline pipeline = new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration()));
     PCollection<Pair<String, StringWrapper>> sorted = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt"))
         .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
 
@@ -165,7 +170,7 @@ public class SortIT implements Serializable {
 
   @Test
   public void testAvroReflectSortTable() throws IOException {
-    Pipeline pipeline = new MRPipeline(SortIT.class);
+    Pipeline pipeline = new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration()));
     PTable<String, StringWrapper> unsorted = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt")).parallelDo(
         new MapFn<String, Pair<String, StringWrapper>>() {
 
@@ -187,7 +192,7 @@ public class SortIT implements Serializable {
 
   @Test
   public void testAvroSortTable() throws Exception {
-    runTable(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), "A");
+    runTable(new MRPipeline(SortIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance(), "A");
   }
 
   private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
index 7165ba2..eda9c95 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
@@ -36,10 +36,13 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.Person.Builder;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.avro.SafeAvroSerialization;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -52,6 +55,9 @@ public class SpecificAvroGroupByIT implements Serializable {
   private static final long serialVersionUID = 1344118240353796561L;
 
   private transient File avroFile;
+  @Rule
+  public transient TemporaryPath temporaryPath= new TemporaryPath();
+
 
   @Before
   public void setUp() throws IOException {
@@ -66,14 +72,14 @@ public class SpecificAvroGroupByIT implements Serializable {
   @Test
   public void testGrouByWithSpecificAvroType() throws Exception {
 
-    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
+    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, temporaryPath.setTempLoc(new Configuration()));
 
     testSpecificAvro(pipeline);
   }
 
   @Test
   public void testGrouByOnSpecificAvroButReflectionDatumReader() throws Exception {
-    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
+    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, temporaryPath.setTempLoc(new Configuration()));
 
     // https://issues.apache.org/jira/browse/AVRO-1046 resolves
     // the ClassCastException when reading specific Avro types with

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
index 5700740..b3db6d5 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
@@ -30,10 +30,13 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Join;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 public abstract class JoinTester implements Serializable {
@@ -78,15 +81,17 @@ public abstract class JoinTester implements Serializable {
 
     pipeline.done();
   }
+  @Rule
+  public transient TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test
   public void testWritableJoin() throws Exception {
-    run(new MRPipeline(InnerJoinIT.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(InnerJoinIT.class, temporaryPath.setTempLoc(new Configuration())), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvroJoin() throws Exception {
-    run(new MRPipeline(InnerJoinIT.class), AvroTypeFamily.getInstance());
+    run(new MRPipeline(InnerJoinIT.class, temporaryPath.setTempLoc(new Configuration())), AvroTypeFamily.getInstance());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
index f7f8f09..66a9c75 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -33,7 +33,10 @@ import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -58,6 +61,8 @@ public class MapsideJoinIT {
     }
 
   }
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
 
   @Test(expected = CrunchRuntimeException.class)
   public void testNonMapReducePipeline() {
@@ -66,7 +71,7 @@ public class MapsideJoinIT {
 
   @Test
   public void testMapsideJoin_RightSideIsEmpty() throws IOException {
-    MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class);
+    MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class, temporaryPath.setTempLoc(new Configuration()));
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
 
@@ -83,7 +88,7 @@ public class MapsideJoinIT {
 
   @Test
   public void testMapsideJoin() throws IOException {
-    runMapsideJoin(new MRPipeline(MapsideJoinIT.class));
+    runMapsideJoin(new MRPipeline(MapsideJoinIT.class, temporaryPath.setTempLoc(new Configuration())));
   }
 
   private void runMapsideJoin(Pipeline pipeline) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
index 2531d24..24427bd 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
@@ -37,8 +37,11 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.Employee;
 import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -48,6 +51,8 @@ public class MultiAvroSchemaJoinIT {
 
   private File personFile;
   private File employeeFile;
+  @Rule
+  public TemporaryPath temporaryPath= new TemporaryPath();
 
   @Before
   public void setUp() throws Exception {
@@ -102,7 +107,7 @@ public class MultiAvroSchemaJoinIT {
 
   @Test
   public void testJoin() throws Exception {
-    Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class);
+    Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class, temporaryPath.setTempLoc(new Configuration()));
     PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class)));
     PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class)));
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/it/java/org/apache/crunch/test/TemporaryPath.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/test/TemporaryPath.java b/crunch/src/it/java/org/apache/crunch/test/TemporaryPath.java
index 9218213..7bdc91a 100644
--- a/crunch/src/it/java/org/apache/crunch/test/TemporaryPath.java
+++ b/crunch/src/it/java/org/apache/crunch/test/TemporaryPath.java
@@ -3,7 +3,10 @@ package org.apache.crunch.test;
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.Description;
@@ -12,64 +15,62 @@ import org.junit.runners.model.Statement;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
 
-
 /**
- * Creates a temporary directory for a test case and destroys it 
- * afterwards. This provides a temporary directory like JUnit's 
- * {@link TemporaryFolder} but geared towards Hadoop applications.
- * Unlike {@link TemporaryFolder}, it doesn't create any files or
- * directories except for the root directory itself.
+ * Creates a temporary directory for a test case and destroys it afterwards.
+ * This provides a temporary directory like JUnit's {@link TemporaryFolder} but
+ * geared towards Hadoop applications. Unlike {@link TemporaryFolder}, it
+ * doesn't create any files or directories except for the root directory itself.
  */
 public final class TemporaryPath extends ExternalResource {
   private TemporaryFolder tmp = new TemporaryFolder();
-  
+
   @Override
   public Statement apply(Statement base, Description description) {
     return tmp.apply(base, description);
   }
-  
+
   /**
    * Get the root directory which will be deleted automatically.
    */
   public File getRootFile() {
     return tmp.getRoot();
   }
-  
+
   /**
    * Get the root directory as a {@link Path}.
    */
   public Path getRootPath() {
     return toPath(tmp.getRoot());
   }
-  
+
   /**
    * Get the root directory as an absolute file name.
    */
   public String getRootFileName() {
     return tmp.getRoot().getAbsolutePath();
   }
-  
+
   /**
    * Get a {@link File} below the temporary directory.
    */
   public File getFile(String fileName) {
     return new File(getRootFile(), fileName);
   }
-  
+
   /**
    * Get a {@link Path} below the temporary directory.
    */
   public Path getPath(String fileName) {
     return toPath(getFile(fileName));
   }
-  
+
   /**
    * Get an absolute file name below the temporary directory.
    */
   public String getFileName(String fileName) {
     return getFile(fileName).getAbsolutePath();
   }
-  
+
   /**
    * Copy a classpath resource to {@link File}.
    */
@@ -92,14 +93,17 @@ public final class TemporaryPath extends ExternalResource {
   public String copyResourceFileName(String resourceName) throws IOException {
     return copyResourceFile(resourceName).getAbsolutePath();
   }
-  
+
   private void copy(String resourceName, File dest) throws IOException {
-    Files.copy(
-        Resources.newInputStreamSupplier(Resources.getResource(resourceName)),
-        dest);
+    Files.copy(Resources.newInputStreamSupplier(Resources.getResource(resourceName)), dest);
   }
-  
+
   private static Path toPath(File file) {
     return new Path(file.getAbsolutePath());
   }
+
+  public Configuration setTempLoc(Configuration config) throws IOException {
+    config.set(RuntimeParameters.TMP_DIR, tmp.newFolder().getAbsolutePath());
+    return config;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/627bb42d/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 29a1963..ec03781 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -68,7 +68,7 @@ public class MRPipeline implements Pipeline {
   private final String name;
   private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
-  private final Path tempDirectory;
+  private Path tempDirectory;
   private int tempFileIndex;
   private int nextAnonymousStageId;
 
@@ -105,6 +105,7 @@ public class MRPipeline implements Pipeline {
   @Override
   public void setConfiguration(Configuration conf) {
     this.conf = conf;
+    this.tempDirectory = createTempDirectory(conf);
   }
 
   @Override


Mime
View raw message