crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-278: Refactor MapsideJoin logic and introduce ReadableData abstraction for working with in-memory datasets in Crunch jobs.
Date Wed, 23 Oct 2013 13:21:17 GMT
CRUNCH-278: Refactor MapsideJoin logic and introduce ReadableData abstraction for working
with in-memory datasets in Crunch jobs.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/655df3c4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/655df3c4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/655df3c4

Branch: refs/heads/master
Commit: 655df3c45d3d49bf3787d485d94af4d68d3ccc86
Parents: ff56d05
Author: Josh Wills <jwills@apache.org>
Authored: Wed Oct 9 22:17:39 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Oct 22 11:32:18 2013 -0700

----------------------------------------------------------------------
 .../crunch/lib/join/MapsideJoinStrategyIT.java  |  44 +++++--
 .../java/org/apache/crunch/PCollection.java     |   8 ++
 .../org/apache/crunch/ParallelDoOptions.java    |  17 ++-
 .../java/org/apache/crunch/ReadableData.java    |  55 +++++++++
 .../crunch/impl/mem/collect/MemCollection.java  |   6 +
 .../impl/mem/collect/MemReadableData.java       |  52 ++++++++
 .../impl/mr/collect/DelegatingReadableData.java |  67 +++++++++++
 .../impl/mr/collect/DoCollectionImpl.java       |   9 ++
 .../crunch/impl/mr/collect/DoFnIterator.java    |  98 +++++++++++++++
 .../crunch/impl/mr/collect/DoTableImpl.java     |   9 ++
 .../crunch/impl/mr/collect/InputCollection.java |  12 ++
 .../crunch/impl/mr/collect/InputTable.java      |   7 ++
 .../crunch/impl/mr/collect/PCollectionImpl.java |  22 ++++
 .../impl/mr/collect/PGroupedTableImpl.java      |   6 +
 .../crunch/impl/mr/collect/UnionCollection.java |  15 +++
 .../impl/mr/collect/UnionReadableData.java      |  64 ++++++++++
 .../crunch/impl/mr/collect/UnionTable.java      |  14 +++
 .../org/apache/crunch/io/ReadableSource.java    |   7 ++
 .../apache/crunch/io/avro/AvroFileSource.java   |   6 +
 .../apache/crunch/io/avro/AvroReadableData.java |  40 +++++++
 .../crunch/io/avro/trevni/TrevniKeySource.java  |   7 ++
 .../io/avro/trevni/TrevniReadableData.java      |  39 ++++++
 .../apache/crunch/io/impl/ReadableDataImpl.java | 103 ++++++++++++++++
 .../io/impl/ReadableSourcePathTargetImpl.java   |  10 ++
 .../io/impl/ReadableSourceTargetImpl.java       |  10 ++
 .../io/parquet/AvroParquetFileSource.java       |   6 +
 .../io/parquet/AvroParquetReadableData.java     |  40 +++++++
 .../crunch/io/seq/SeqFileReadableData.java      |  39 ++++++
 .../org/apache/crunch/io/seq/SeqFileSource.java |   6 +
 .../crunch/io/seq/SeqFileTableSource.java       |   6 +
 .../apache/crunch/io/text/NLineFileSource.java  |   6 +
 .../apache/crunch/io/text/TextFileSource.java   |   6 +
 .../crunch/io/text/TextFileTableSource.java     |   6 +
 .../apache/crunch/io/text/TextReadableData.java |  51 ++++++++
 .../crunch/lib/join/MapsideJoinStrategy.java    | 118 +++++++------------
 .../impl/mr/collect/DoCollectionImplTest.java   |   6 +
 .../org/apache/crunch/io/hbase/HBaseData.java   |  69 +++++++++++
 .../crunch/io/hbase/HBaseSourceTarget.java      |  81 ++++---------
 .../crunch/io/hbase/HFileReadableData.java      |  37 ++++++
 .../org/apache/crunch/io/hbase/HFileSource.java |   6 +
 .../apache/crunch/io/hbase/HTableIterable.java  |  48 ++++++++
 .../apache/crunch/io/hbase/HTableIterator.java  |  70 +++++++++++
 42 files changed, 1178 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index b5ce331..9972549 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -91,12 +91,22 @@ public class MapsideJoinStrategyIT {
 
   @Test
   public void testMapSideJoin_MemPipeline() {
-    runMapsideJoin(MemPipeline.getInstance(), true);
+    runMapsideJoin(MemPipeline.getInstance(), true, false);
+  }
+
+  @Test
+  public void testMapSideJoin_MemPipeline_Materialized() {
+    runMapsideJoin(MemPipeline.getInstance(), true, true);
   }
   
   @Test
   public void testMapSideJoinLeftOuterJoin_MemPipeline() {
-    runMapsideLeftOuterJoin(MemPipeline.getInstance(), true);
+    runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false);
+  }
+
+  @Test
+  public void testMapSideJoinLeftOuterJoin_MemPipeline_Materialized() {
+    runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true);
   }
 
   @Test
@@ -119,24 +129,33 @@ public class MapsideJoinStrategyIT {
 
   @Test
   public void testMapsideJoin() throws IOException {
-    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false);
+    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, false);
   }
-  
+
+  @Test
+  public void testMapsideJoin_Materialized() throws IOException {
+    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true);
+  }
+
   @Test
   public void testMapsideJoin_LeftOuterJoin() throws IOException {
-    runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false);
+    runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, false);
+  }
+
+  @Test
+  public void testMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
+    runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true);
   }
 
-  private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
+  private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize) {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
     
-    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
     PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN)
         .mapValues("concat", new ConcatValuesFn(), Writables.strings());
 
     PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
-    
     PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN);
 
     List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
@@ -150,7 +169,7 @@ public class MapsideJoinStrategyIT {
     
     PipelineResult res = pipeline.run();
     if (!inMemory) {
-      assertEquals(2, res.getStageResults().size());
+      assertEquals(materialize ? 2 : 1, res.getStageResults().size());
     }
      
     List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
@@ -159,16 +178,15 @@ public class MapsideJoinStrategyIT {
     assertEquals(expectedJoinResult, joinedResultList);
   }
   
-  private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory) {
+  private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize) {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
     
-    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
     PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN)
         .mapValues("concat", new ConcatValuesFn(), Writables.strings());
 
     PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
-    
     PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
 
     List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
@@ -183,7 +201,7 @@ public class MapsideJoinStrategyIT {
     
     PipelineResult res = pipeline.run();
     if (!inMemory) {
-      assertEquals(2, res.getStageResults().size());
+      assertEquals(materialize ? 2 : 1, res.getStageResults().size());
     }
      
     List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index 6f5abf6..ee8052a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -166,6 +166,14 @@ public interface PCollection<S> {
   PObject<Collection<S>> asCollection();
 
   /**
+   * @return A reference to the data in this instance that can be read from a job running
+   * on a cluster.
+   *
+   * @param materialize If true, materialize this data before returning a reference to it
+   */
+  ReadableData<S> asReadable(boolean materialize);
+
+  /**
    * Returns the {@code PType} of this {@code PCollection}.
    */
   PType<S> getPType();

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
index 4c5411d..65d0df2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
+++ b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -68,7 +69,21 @@ public class ParallelDoOptions {
       this.sourceTargets = Sets.newHashSet();
       this.extraConf = Maps.newHashMap();
     }
-    
+
+    public Builder sources(Source<?>... sources) {
+      return sources(Arrays.asList(sources));
+    }
+
+    public Builder sources(Collection<Source<?>> sources) {
+      for (Source<?> src : sources) {
+        // Only SourceTargets need to be checked for materialization
+        if (src instanceof SourceTarget) {
+          sourceTargets.add((SourceTarget) src);
+        }
+      }
+      return this;
+    }
+
     public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
       Collections.addAll(this.sourceTargets, sourceTargets);
       return this;

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/ReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/ReadableData.java b/crunch-core/src/main/java/org/apache/crunch/ReadableData.java
new file mode 100644
index 0000000..2f16860
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/ReadableData.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Represents the contents of a data source that can be read on the cluster from within one
+ * of the tasks running as part of a Crunch pipeline.
+ */
+public interface ReadableData<T> extends Serializable {
+
+  /**
+   * @return Any {@code SourceTarget} instances that must exist before the data in
+   * this instance can be read. Used by the planner in sequencing job processing.
+   */
+  Set<SourceTarget<?>> getSourceTargets();
+
+  /**
+   * Allows this instance to specify any additional configuration settings that may
+   * be needed by the job that it is launched in.
+   *
+   * @param conf The {@code Configuration} object for the job
+   */
+  void configure(Configuration conf);
+
+  /**
+   * Read the data referenced by this instance within the given context.
+   *
+   * @param context The context of the task that is reading the data
+   * @return An iterable reference to the data in this instance
+   * @throws IOException If the data cannot be read
+   */
+  Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index d0df916..e8e34f2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -34,6 +34,7 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.impl.mem.MemPipeline;
@@ -165,6 +166,11 @@ public class MemCollection<S> implements PCollection<S> {
     return new CollectionPObject<S>(this);
   }
 
+  @Override
+  public ReadableData<S> asReadable(boolean materialize) {
+    return new MemReadableData<S>(collect);
+  }
+
   public Collection<S> getCollection() {
     return collect;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemReadableData.java
new file mode 100644
index 0000000..dd5697a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemReadableData.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+class MemReadableData<T> implements ReadableData<T> {
+
+  private Collection<T> collection;
+
+  public MemReadableData(Collection<T> collection) {
+    this.collection = collection;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    // No-op
+  }
+
+  @Override
+  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
+    return collection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java
new file mode 100644
index 0000000..ce5399b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Implements the {@code ReadableData<T>} interface by delegating to an {@code ReadableData<S>} instance
+ * and passing its contents through a {@code DoFn<S, T>}.
+ */
+class DelegatingReadableData<S, T> implements ReadableData<T> {
+
+  private final ReadableData<S> delegate;
+  private final DoFn<S, T> fn;
+
+  public DelegatingReadableData(ReadableData<S> delegate, DoFn<S, T> fn) {
+    this.delegate = delegate;
+    this.fn = fn;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return delegate.getSourceTargets();
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    delegate.configure(conf);
+    fn.configure(conf);
+  }
+
+  @Override
+  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+    fn.setContext(context);
+    fn.initialize();
+    final Iterable<S> delegateIterable = delegate.read(context);
+    return new Iterable<T>() {
+      @Override
+      public Iterator<T> iterator() {
+        return new DoFnIterator<S, T>(delegateIterable.iterator(), fn);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 50afb75..c0a761a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
 
@@ -50,6 +51,14 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> {
   }
 
   @Override
+  protected ReadableData<S> getReadableDataInternal() {
+    if (getOnlyParent() instanceof PGroupedTableImpl) {
+      return materializedData();
+    }
+    return new DelegatingReadableData(getOnlyParent().asReadable(false), fn);
+  }
+
+  @Override
   public PType<S> getPType() {
     return ntype;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java
new file mode 100644
index 0000000..ff102eb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * An {@code Iterator<T>} that combines a delegate {@code Iterator<S>} and a {@code DoFn<S, T>}, generating
+ * data by passing the contents of the iterator through the function. Note that the input {@code DoFn} should
+ * have both its {@code setContext} and {@code initialize} functions called <b>before</b> it is passed to
+ * the constructor.
+ *
+ * @param <S> The type of the delegate iterator
+ * @param <T> The returned type
+ */
+class DoFnIterator<S, T> implements Iterator<T> {
+
+  private final Iterator<S> iter;
+  private final DoFn<S, T> fn;
+  private CacheEmitter<T> cache;
+  private boolean cleanup;
+
+  public DoFnIterator(Iterator<S> iter, DoFn<S, T> fn) {
+    this.iter = iter;
+    this.fn = fn;
+    this.cache = new CacheEmitter<T>();
+    this.cleanup = false;
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (cache.isEmpty() && iter.hasNext()) {
+      fn.process(iter.next(), cache);
+    }
+    if (cache.isEmpty() && !cleanup) {
+      fn.cleanup(cache);
+      cleanup = true;
+    }
+    return !cache.isEmpty();
+  }
+
+  @Override
+  public T next() {
+    return cache.poll();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  private static class CacheEmitter<T> implements Emitter<T> {
+
+    private final LinkedList<T> cache;
+
+    private CacheEmitter() {
+      this.cache = Lists.newLinkedList();
+    }
+
+    public boolean isEmpty() {
+      return cache.isEmpty();
+    }
+
+    public T poll() {
+      return cache.poll();
+    }
+
+    @Override
+    public void emit(T emitted) {
+      cache.add(emitted);
+    }
+
+    @Override
+    public void flush() {
+      // No-op
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 28e2504..9c8e53d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -24,6 +24,7 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -59,6 +60,14 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V>
   }
 
   @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    if (getOnlyParent() instanceof PGroupedTableImpl) {
+      return materializedData();
+    }
+    return new DelegatingReadableData(getOnlyParent().asReadable(false), fn);
+  }
+
+  @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
     visitor.visitDoTable(this);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
index a4958e7..c801e7f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -20,9 +20,12 @@ package org.apache.crunch.impl.mr.collect;
 import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.types.PType;
 
 import com.google.common.collect.ImmutableList;
@@ -38,6 +41,15 @@ public class InputCollection<S> extends PCollectionImpl<S> {
   }
 
   @Override
+  protected ReadableData<S> getReadableDataInternal() {
+    if (source instanceof ReadableSource) {
+      return ((ReadableSource<S>) source).asReadable();
+    } else {
+      return materializedData();
+    }
+  }
+
+  @Override
   public PType<S> getPType() {
     return source.getType();
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index 8317452..bdd5c64 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect;
 import java.util.List;
 
 import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
@@ -54,6 +55,7 @@ public class InputTable<K, V> extends PTableBase<K, V> {
     return source.getTableType();
   }
 
+
   @Override
   public PType<Pair<K, V>> getPType() {
     return source.getType();
@@ -70,6 +72,11 @@ public class InputTable<K, V> extends PTableBase<K, V> {
   }
 
   @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    return asCollection.getReadableDataInternal();
+  }
+
+  @Override
   public DoNode createDoNode() {
     return DoNode.createInputNode(source);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 958d7f6..b82c883 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -33,12 +33,14 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
 import org.apache.crunch.types.PTableType;
@@ -54,6 +56,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   private final String name;
   protected MRPipeline pipeline;
+  private boolean materialized;
   protected SourceTarget<S> materializedAt;
   protected final ParallelDoOptions doOptions;
   
@@ -151,6 +154,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
       LOG.warn("Materializing an empty PCollection: " + this.getName());
       return Collections.emptyList();
     }
+    materialized = true;
     return getPipeline().materialize(this);
   }
 
@@ -276,6 +280,24 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   protected abstract void acceptInternal(Visitor visitor);
 
   @Override
+  public ReadableData<S> asReadable(boolean materialize) {
+    if (materializedAt != null && (materializedAt instanceof ReadableSource)) {
+      return ((ReadableSource) materializedAt).asReadable();
+    } else if (materialized || materialize) {
+      return ((MRPipeline) getPipeline()).getMaterializeSourceTarget(this).asReadable();
+    } else {
+      return getReadableDataInternal();
+    }
+  }
+
+  protected ReadableData<S> materializedData() {
+    materialized = true;
+    return pipeline.getMaterializeSourceTarget(this).asReadable();
+  }
+
+  protected abstract ReadableData<S> getReadableDataInternal();
+
+  @Override
   public long getSize() {
     if (materializedAt != null) {
       long sz = materializedAt.getSize(getPipeline().getConfiguration());

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 7eb2b09..e62d9c3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -31,6 +31,7 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.plan.DoNode;
@@ -77,6 +78,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   }
 
   @Override
+  protected ReadableData<Pair<K, Iterable<V>>> getReadableDataInternal() {
+    throw new UnsupportedOperationException("PGroupedTable does not currently support readability");
+  }
+
+  @Override
   protected long getSizeInternal() {
     return parent.getSizeInternal();
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
index b6e1fdd..4a69d96 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
@@ -19,6 +19,8 @@ package org.apache.crunch.impl.mr.collect;
 
 import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
@@ -73,6 +75,19 @@ public class UnionCollection<S> extends PCollectionImpl<S> {
   }
 
   @Override
+  protected ReadableData<S> getReadableDataInternal() {
+    List<ReadableData<S>> prds = Lists.newArrayList();
+    for (PCollectionImpl<S> parent : parents) {
+      if (parent instanceof PGroupedTableImpl) {
+        return materializedData();
+      } else {
+        prds.add(parent.asReadable(false));
+      }
+    }
+    return new UnionReadableData<S>(prds);
+  }
+
+  @Override
   public PType<S> getPType() {
     return parents.get(0).getPType();
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java
new file mode 100644
index 0000000..58a10c3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+class UnionReadableData<T> implements ReadableData<T> {
+
+  private final List<ReadableData<T>> data;
+
+  public UnionReadableData(List<ReadableData<T>> data) {
+    this.data = data;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    Set<SourceTarget<?>> srcTargets = Sets.newHashSet();
+    for (ReadableData<T> rd: data) {
+      srcTargets.addAll(rd.getSourceTargets());
+    }
+    return srcTargets;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+   for (ReadableData<T> rd : data) {
+     rd.configure(conf);
+   }
+  }
+
+  @Override
+  public Iterable<T> read(final TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+    List<Iterable<T>> iterables = Lists.newArrayList();
+    for (ReadableData<T> rd : data) {
+      iterables.add(rd.read(context));
+    }
+    return Iterables.concat(iterables);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
index 91f518a..b6a26d5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect;
 import java.util.List;
 
 import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PTableType;
@@ -94,6 +95,19 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
   }
 
   @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    List<ReadableData<Pair<K, V>>> prds = Lists.newArrayList();
+    for (PCollectionImpl<Pair<K, V>> parent : parents) {
+      if (parent instanceof PGroupedTableImpl) {
+        return materializedData();
+      } else {
+        prds.add(parent.asReadable(false));
+      }
+    }
+    return new UnionReadableData<Pair<K, V>>(prds);
+  }
+
+  @Override
   public DoNode createDoNode() {
     throw new UnsupportedOperationException("Unioned table does not support do nodes");
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
index 0407167..fcb4cbd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io;
 
 import java.io.IOException;
 
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
 import org.apache.hadoop.conf.Configuration;
 
@@ -38,4 +39,10 @@ public interface ReadableSource<T> extends Source<T> {
    * @throws IOException
    */
   Iterable<T> read(Configuration conf) throws IOException;
+
+  /**
+   * @return a {@code ReadableData} instance containing the data referenced by this
+   * {@code ReadableSource}.
+   */
+  ReadableData<T> asReadable();
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 8415d12..a961016 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
@@ -71,6 +72,11 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour
     return read(conf, getFileReaderFactory((AvroType<T>) ptype));
   }
 
+  @Override
+  public ReadableData<T> asReadable() {
+    return new AvroReadableData<T>(this.paths, (AvroType<T>) ptype);
+  }
+
   protected AvroFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){
     return new AvroFileReaderFactory(reader, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroReadableData.java
new file mode 100644
index 0000000..dda61d7
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroReadableData.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class AvroReadableData<T> extends ReadableDataImpl<T> {
+
+  private final AvroType<T> avroType;
+
+  public AvroReadableData(List<Path> paths, AvroType<T> avroType) {
+    super(paths);
+    this.avroType = avroType;
+  }
+
+  @Override
+  protected FileReaderFactory<T> getFileReaderFactory() {
+    return new AvroFileReaderFactory<T>(avroType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
index 4cc902a..3f387d7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.avro.mapred.AvroJob;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
@@ -56,4 +57,10 @@ public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSou
   public Iterable<T> read(Configuration conf) throws IOException {
     return read(conf, new TrevniFileReaderFactory<T>((AvroType<T>) ptype));
   }
+
+  @Override
+  public ReadableData<T> asReadable() {
+    return new TrevniReadableData<T>(paths, (AvroType<T>) ptype);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java
new file mode 100644
index 0000000..5a681c4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class TrevniReadableData<T> extends ReadableDataImpl<T> {
+  private final AvroType<T> avroType;
+
+  public TrevniReadableData(List<Path> paths, AvroType<T> avroType) {
+    super(paths);
+    this.avroType = avroType;
+  }
+
+  @Override
+  protected FileReaderFactory<T> getFileReaderFactory() {
+    return new TrevniFileReaderFactory<T>(avroType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableDataImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableDataImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableDataImpl.java
new file mode 100644
index 0000000..3b5c1bb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableDataImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public abstract class ReadableDataImpl<T> implements ReadableData<T> {
+
+  private List<String> paths;
+  private transient SourceTarget parent;
+
+  protected ReadableDataImpl(List<Path> paths) {
+    this.paths = Lists.newArrayList();
+    for (Path p : paths) {
+      this.paths.add(p.toString());
+    }
+  }
+
+  public ReadableData<T> setParent(SourceTarget<?> parent) {
+    this.parent = parent;
+    return this;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    if (parent != null) {
+      return ImmutableSet.<SourceTarget<?>>of(parent);
+    } else {
+      return ImmutableSet.of();
+    }
+  }
+
+
+  @Override
+  public void configure(Configuration conf) {
+    for (String path : paths) {
+      DistCache.addCacheFile(new Path(path), conf);
+    }
+  }
+
+  protected abstract FileReaderFactory<T> getFileReaderFactory();
+
+  private Path getCacheFilePath(String input, Configuration conf) {
+    Path local = DistCache.getPathToCacheFile(new Path(input), conf);
+    if (local == null) {
+      throw new CrunchRuntimeException("Can't find local cache file for '" + input + "'");
+    }
+    return local;
+  }
+
+  @Override
+  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
+    final Configuration conf = ctxt.getConfiguration();
+    final FileReaderFactory<T> readerFactory = getFileReaderFactory();
+    return Iterables.concat(Lists.transform(paths, new Function<String, Iterable<T>>() {
+      @Override
+      public Iterable<T> apply(@Nullable String input) {
+        Path path = getCacheFilePath(input, conf);
+        try {
+          FileSystem fs = path.getFileSystem(conf);
+          return CompositePathIterable.create(fs, path, readerFactory);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
index 6506816..df1f0fa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.impl;
 
 import java.io.IOException;
 
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
@@ -36,4 +37,13 @@ public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> imp
     return ((ReadableSource<T>) source).read(conf);
   }
 
+  @Override
+  public ReadableData<T> asReadable() {
+    ReadableData<T> rd = ((ReadableSource<T>) source).asReadable();
+    if (rd instanceof ReadableDataImpl) {
+      ((ReadableDataImpl<T>) rd).setParent(this);
+    }
+    return rd;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
index f435b3b..13475fb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.impl;
 
 import java.io.IOException;
 
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.Target;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
@@ -34,4 +35,13 @@ public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements
   public Iterable<T> read(Configuration conf) throws IOException {
     return ((ReadableSource<T>) source).read(conf);
   }
+
+  @Override
+  public ReadableData<T> asReadable() {
+    ReadableData<T> rd = ((ReadableSource<T>) source).asReadable();
+    if (rd instanceof ReadableDataImpl) {
+      ((ReadableDataImpl<T>) rd).setParent(this);
+    }
+    return rd;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
index 81678d4..76e80ef 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.avro.AvroType;
@@ -53,6 +54,11 @@ public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements Reada
     return read(conf, getFileReaderFactory((AvroType<T>) ptype));
   }
 
+  @Override
+  public ReadableData<T> asReadable() {
+    return new AvroParquetReadableData<T>(paths, (AvroType<T>) ptype);
+  }
+
   protected AvroParquetFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){
     return new AvroParquetFileReaderFactory<T>(ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetReadableData.java
new file mode 100644
index 0000000..9673c14
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetReadableData.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class AvroParquetReadableData<T> extends ReadableDataImpl<T> {
+
+  private final AvroType<T> avroType;
+
+  public AvroParquetReadableData(List<Path> paths, AvroType<T> avroType) {
+    super(paths);
+    this.avroType = avroType;
+  }
+
+  @Override
+  protected FileReaderFactory<T> getFileReaderFactory() {
+    return new AvroParquetFileReaderFactory<T>(avroType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileReadableData.java
new file mode 100644
index 0000000..356870a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileReadableData.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class SeqFileReadableData<T> extends ReadableDataImpl {
+  private final PType<T> ptype;
+
+  public SeqFileReadableData(List<Path> paths, PType<T> ptype) {
+    super(paths);
+    this.ptype = ptype;
+  }
+
+  @Override
+  protected FileReaderFactory<T> getFileReaderFactory() {
+    return new SeqFileReaderFactory<T>(ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
index 9e6edc8..1bf64e4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import java.util.List;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,11 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSourc
   }
 
   @Override
+  public ReadableData<T> asReadable() {
+    return new SeqFileReadableData<T>(paths, ptype);
+  }
+
+  @Override
   public String toString() {
     return "SeqFile(" + pathsAsString() + ")";
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
index cecafeb..134e9ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import java.util.List;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
 import org.apache.crunch.types.PTableType;
@@ -52,6 +53,11 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen
   }
 
   @Override
+  public ReadableData<Pair<K, V>> asReadable() {
+    return new SeqFileReadableData<Pair<K, V>>(paths, getTableType());
+  }
+
+  @Override
   public String toString() {
     return "SeqFile(" + pathsAsString() + ")";
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
index 0756b70..dededff 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import java.util.List;
 
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
@@ -87,4 +88,9 @@ public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSou
   public Iterable<T> read(Configuration conf) throws IOException {
     return read(conf, new TextFileReaderFactory<T>(LineParser.forType(ptype)));
   }
+
+  @Override
+  public ReadableData<T> asReadable() {
+    return new TextReadableData<T>(paths, ptype);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
index 5f9d4c2..fe23c47 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import java.util.List;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
@@ -57,4 +58,9 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour
   public Iterable<T> read(Configuration conf) throws IOException {
     return read(conf, new TextFileReaderFactory<T>(LineParser.forType(ptype)));
   }
+
+  @Override
+  public ReadableData<T> asReadable() {
+    return new TextReadableData<T>(paths, ptype);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
index 66b2e67..5c00a5e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.crunch.Pair;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.conf.Configuration;
@@ -88,4 +89,9 @@ public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V>
         new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(),
             separator)));
   }
+
+  @Override
+  public ReadableData<Pair<K, V>> asReadable() {
+    return new TextReadableData<Pair<K, V>>(paths, getTableType(), separator);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/io/text/TextReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextReadableData.java
new file mode 100644
index 0000000..5baab94
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextReadableData.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class TextReadableData<T> extends ReadableDataImpl<T> {
+
+  private final PType<T> ptype;
+  private final String sep;
+
+  public TextReadableData(List<Path> paths, PType<T> ptype) {
+    this(paths, ptype, null);
+  }
+
+  public TextReadableData(List<Path> paths, PType<T> ptype, String sep) {
+    super(paths);
+    this.ptype = ptype;
+    this.sep = sep;
+  }
+
+  @Override
+  protected FileReaderFactory<T> getFileReaderFactory() {
+    if (sep == null) {
+      return new TextFileReaderFactory<T>(ptype);
+    } else {
+      return new TextFileReaderFactory<T>(LineParser.forTableType(((PTableType) ptype), sep));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
index 1710f30..680bb2e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
@@ -26,17 +26,11 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.crunch.types.PType;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
 /**
@@ -50,6 +44,29 @@ import com.google.common.collect.Multimap;
  */
 public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
 
+  private boolean materialize;
+
+  /**
+   * Constructs a new instance of the {@code MapsideJoinStratey}, materializing the right-side
+   * join table to disk before the join is performed.
+   */
+  public MapsideJoinStrategy() {
+    this(true);
+  }
+
+  /**
+   * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code }materialize}
+   * argument is true, then the right-side join {@code PTable} will be materialized to disk
+   * before the in-memory join is performed. If it is false, then Crunch can optionally read
+   * and process the data from the right-side table without having to run a job to materialize
+   * the data to disk first.
+   *
+   * @param materialize Whether or not to materialize the right-side table before the join
+   */
+  public MapsideJoinStrategy(boolean materialize) {
+    this.materialize = materialize;
+  }
+
   @Override
   public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
     switch (joinType) {
@@ -66,96 +83,43 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
 
   private PTable<K, Pair<U,V>> joinInternal(PTable<K, U> left, PTable<K, V> right, boolean includeUnmatchedLeftValues) {
     PTypeFamily tf = left.getTypeFamily();
-    Iterable<Pair<K, V>> iterable = right.materialize();
-
-    if (iterable instanceof MaterializableIterable) {
-      MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
-      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
-          includeUnmatchedLeftValues, right.getPType());
-      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
-      if (mi.isSourceTarget()) {
-        optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
-      }
-      return left.parallelDo("mapjoin", mapJoinDoFn,
-          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
-          optionsBuilder.build());
-    } else { // in-memory pipeline
-      return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable, includeUnmatchedLeftValues),
-          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
-    }
+    ReadableData<Pair<K, V>> rightReadable = right.asReadable(materialize);
+    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(rightReadable, includeUnmatchedLeftValues);
+    ParallelDoOptions options = ParallelDoOptions.builder()
+        .sourceTargets(rightReadable.getSourceTargets())
+        .build();
+    return left.parallelDo("mapjoin", mapJoinDoFn,
+        tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
+        options);
   }
 
-  static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
-    private Multimap<K, V> joinMap;
-    private boolean includeUnmatched;
-    
-    public InMemoryJoinFn(Iterable<Pair<K, V>> iterable, boolean includeUnmatched) {
-      joinMap = HashMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
-      }
-      this.includeUnmatched = includeUnmatched;
-    }
-    
-    @Override
-    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
-      K key = input.first();
-      U value = input.second();
-      Collection<V> joinValues = joinMap.get(key);
-      if (includeUnmatched && joinValues.isEmpty()) {
-        emitter.emit(Pair.of(key, Pair.of(value, (V)null)));
-      } else {
-        for (V joinValue : joinValues) {
-          Pair<U, V> valuePair = Pair.of(value, joinValue);
-          emitter.emit(Pair.of(key, valuePair));
-        }
-      }
-    }
-  }
-  
   static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
 
-    private String inputPath;
+    private final ReadableData<Pair<K, V>> readable;
     private final boolean includeUnmatched;
-    private PType<Pair<K, V>> ptype;
     private Multimap<K, V> joinMap;
 
-    public MapsideJoinDoFn(String inputPath, boolean includeUnmatched, PType<Pair<K, V>> ptype) {
-      this.inputPath = inputPath;
+    public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, boolean includeUnmatched) {
+      this.readable = rs;
       this.includeUnmatched = includeUnmatched;
-      this.ptype = ptype;
-    }
-
-    private Path getCacheFilePath() {
-      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
-      if (local == null) {
-        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
-      }
-      return local;
     }
 
     @Override
     public void configure(Configuration conf) {
-      DistCache.addCacheFile(new Path(inputPath), conf);
+      readable.configure(conf);
     }
     
     @Override
     public void initialize() {
       super.initialize();
 
-      ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
-          getCacheFilePath());
-      Iterable<Pair<K, V>> iterable = null;
+      joinMap = ArrayListMultimap.create();
       try {
-        iterable = sourceTarget.read(getConfiguration());
+        for (Pair<K, V> joinPair : readable.read(getContext())) {
+          joinMap.put(joinPair.first(), joinPair.second());
+        }
       } catch (IOException e) {
-        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
-      }
-
-      joinMap = ArrayListMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
+        throw new CrunchRuntimeException("Error reading map-side join data", e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
index b025119..4b607b1 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
@@ -103,6 +104,11 @@ public class DoCollectionImplTest {
     }
 
     @Override
+    protected ReadableData<String> getReadableDataInternal() {
+      return null;
+    }
+
+    @Override
     protected long getSizeInternal() {
       return internalSize;
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
new file mode 100644
index 0000000..84c39db
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> {
+
+  private final String table;
+  private final String scanAsString;
+  private transient SourceTarget parent;
+
+  public HBaseData(String table, String scanAsString, SourceTarget<?> parent) {
+    this.table = table;
+    this.scanAsString = scanAsString;
+    this.parent = parent;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    if (parent != null) {
+      return ImmutableSet.<SourceTarget<?>>of(parent);
+    } else {
+      return ImmutableSet.of();
+    }
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    // No-op
+  }
+
+  @Override
+  public Iterable<Pair<ImmutableBytesWritable, Result>> read(
+      TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
+    Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration());
+    HTable htable = new HTable(hconf, table);
+    Scan scan = HBaseSourceTarget.convertStringToScan(scanAsString);
+    return new HTableIterable(htable, scan);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 1b2a03e..c1d7eb7 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -17,15 +17,17 @@
  */
 package org.apache.crunch.io.hbase;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
@@ -42,7 +44,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
@@ -123,13 +124,22 @@ public class HBaseSourceTarget extends HBaseTarget implements
     }
   }
 
-  static String convertScanToString(Scan scan) throws IOException {
+  public static String convertScanToString(Scan scan) throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(out);
     scan.write(dos);
     return Base64.encodeBytes(out.toByteArray());
   }
 
+  public static Scan convertStringToScan(String string) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(string));
+    DataInputStream dais = new DataInputStream(bais);
+    Scan scan = new Scan();
+    scan.readFields(dais);
+    dais.close();
+    return scan;
+  }
+
   @Override
   public long getSize(Configuration conf) {
     // TODO something smarter here.
@@ -155,66 +165,19 @@ public class HBaseSourceTarget extends HBaseTarget implements
   }
 
   @Override
+  public ReadableData<Pair<ImmutableBytesWritable, Result>> asReadable() {
+    try {
+      return new HBaseData(table, convertScanToString(scan), this);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public SourceTarget<Pair<ImmutableBytesWritable, Result>> conf(String key, String value) {
     inputConf(key, value);
     outputConf(key, value);
     return this;
   }
 
-  private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
-    private final HTable table;
-    private final Scan scan;
-
-    public HTableIterable(HTable table, Scan scan) {
-      this.table = table;
-      this.scan = scan;
-    }
-
-    @Override
-    public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
-      try {
-        return new HTableIterator(table, table.getScanner(scan));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private static class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
-
-    private final HTable table;
-    private final ResultScanner scanner;
-    private final Iterator<Result> iter;
-
-    public HTableIterator(HTable table, ResultScanner scanner) {
-      this.table = table;
-      this.scanner = scanner;
-      this.iter = scanner.iterator();
-    }
-
-    @Override
-    public boolean hasNext() {
-      boolean hasNext = iter.hasNext();
-      if (!hasNext) {
-        scanner.close();
-        try {
-          table.close();
-        } catch (IOException e) {
-          LOG.error("Exception closing HTable: " + table.getTableName(), e);
-        }
-      }
-      return hasNext;
-    }
-
-    @Override
-    public Pair<ImmutableBytesWritable, Result> next() {
-      Result next = iter.next();
-      return Pair.of(new ImmutableBytesWritable(next.getRow()), next);
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReadableData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReadableData.java
new file mode 100644
index 0000000..0b94424
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReadableData.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.util.List;
+
+public class HFileReadableData extends ReadableDataImpl<KeyValue> {
+
+  public HFileReadableData(List<Path> paths) {
+    super(paths);
+  }
+
+  @Override
+  protected FileReaderFactory<KeyValue> getFileReaderFactory() {
+    return new HFileReaderFactory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
index 31d314d..3ce18bd 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
@@ -82,6 +83,11 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSou
   }
 
   @Override
+  public ReadableData<KeyValue> asReadable() {
+    return new HFileReadableData(paths);
+  }
+
+  @Override
   public String toString() {
     return "HFile(" + pathsAsString() + ")";
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
new file mode 100644
index 0000000..c58732c
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
@@ -0,0 +1,48 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.Pair;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
+  private final HTable table;
+  private final Scan scan;
+
+  public HTableIterable(HTable table, Scan scan) {
+    this.table = table;
+    this.scan = scan;
+  }
+
+  @Override
+  public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
+    try {
+      return new HTableIterator(table, table.getScanner(scan));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}


Mime
View raw message