crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-489: Add methods for creating PCollections from Java Iterables on the client and optionally naming created/input PCollections.
Date Mon, 26 Jan 2015 02:15:15 GMT
Repository: crunch
Updated Branches:
  refs/heads/master ab7e5e4c3 -> 006cd72a3


CRUNCH-489: Add methods for creating PCollections from Java Iterables on the client
and optionally naming created/input PCollections.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/006cd72a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/006cd72a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/006cd72a

Branch: refs/heads/master
Commit: 006cd72a383d91ba15ec0d6b596872a062567d48
Parents: ab7e5e4
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jan 22 15:38:57 2015 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Jan 25 09:48:35 2015 -0800

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/PageRankIT.java   |  32 ++--
 .../java/org/apache/crunch/io/NLineInputIT.java |  25 ++-
 .../crunch/io/avro/AvroFileSourceTargetIT.java  |  19 ++
 .../crunch/io/avro/AvroMemPipelineIT.java       |   6 +-
 .../java/org/apache/crunch/CreateOptions.java   |  59 ++++++
 .../main/java/org/apache/crunch/Pipeline.java   |  62 +++++++
 .../crunch/impl/dist/DistributedPipeline.java   |  55 +++++-
 .../impl/dist/collect/BaseInputCollection.java  |   4 +-
 .../impl/dist/collect/BaseInputTable.java       |   6 +-
 .../impl/dist/collect/PCollectionFactory.java   |   2 +
 .../impl/dist/collect/PCollectionImpl.java      |   9 +-
 .../crunch/impl/dist/collect/PTableBase.java    |   4 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java |  52 ++++--
 .../crunch/impl/mr/collect/InputCollection.java |   4 +-
 .../crunch/impl/mr/collect/InputTable.java      |   4 +-
 .../impl/mr/collect/MRCollectionFactory.java    |   6 +-
 .../java/org/apache/crunch/types/PType.java     |  14 ++
 .../crunch/types/avro/AvroGroupedTableType.java |  11 ++
 .../org/apache/crunch/types/avro/AvroType.java  |  46 +++++
 .../writable/WritableGroupedTableType.java      |  13 ++
 .../types/writable/WritableTableType.java       |  36 ++++
 .../crunch/types/writable/WritableType.java     |  56 ++++++
 .../src/it/java/org/apache/crunch/CreateIT.java |  88 +++++++++
 .../java/org/apache/crunch/SparkPageRankIT.java |  27 ++-
 .../apache/crunch/impl/spark/SparkPipeline.java |  14 ++
 .../impl/spark/collect/CreatedCollection.java   | 149 +++++++++++++++
 .../crunch/impl/spark/collect/CreatedTable.java | 180 +++++++++++++++++++
 .../impl/spark/collect/InputCollection.java     |   6 +-
 .../crunch/impl/spark/collect/InputTable.java   |   6 +-
 .../impl/spark/collect/SparkCollectFactory.java |   6 +-
 .../crunch/impl/spark/serde/AvroSerDe.java      |   6 -
 .../crunch/impl/spark/serde/SerDeFactory.java   |  39 ++++
 32 files changed, 977 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index 23c71b3..701f78a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
@@ -41,6 +42,19 @@ import com.google.common.collect.Lists;
 
 public class PageRankIT {
 
+  private static List<String> URLS = ImmutableList.of(
+          "www.A.com       www.B.com",
+          "www.A.com       www.C.com",
+          "www.A.com       www.D.com",
+          "www.A.com       www.E.com",
+          "www.B.com       www.D.com",
+          "www.B.com       www.E.com",
+          "www.C.com       www.D.com",
+          "www.D.com       www.B.com",
+          "www.E.com       www.A.com",
+          "www.F.com       www.B.com",
+          "www.F.com       www.C.com");
+
   public static class PageRankData {
     public float score;
     public float lastScore;
@@ -76,35 +90,31 @@ public class PageRankIT {
   public void testAvroReflect() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
     run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
-        urlInput, prType, tf);
+        prType, tf);
   }
 
   @Test
   public void testAvroMReflectInMemory() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(MemPipeline.getInstance(), urlInput, prType, tf);
+    run(MemPipeline.getInstance(), prType, tf);
   }
 
   @Test
   public void testAvroJSON() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
     run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
-        urlInput, prType, tf);
+        prType, tf);
   }
 
   @Test
   public void testWritablesJSON() throws Exception {
     PTypeFamily tf = WritableTypeFamily.getInstance();
     PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
     run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
-        urlInput, prType, tf);
+        prType, tf);
   }
 
   public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
@@ -134,13 +144,13 @@ public class PageRankIT {
         }, input.getValueType());
   }
 
-  public static void run(Pipeline pipeline, String urlInput,
+  public static void run(Pipeline pipeline,
       PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
-    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
+    PTable<String, PageRankData> scores = pipeline.create(URLS, ptf.strings())
         .parallelDo(new MapFn<String, Pair<String, String>>() {
           @Override
           public Pair<String, String> map(String input) {
-            String[] urls = input.split("\\t");
+            String[] urls = input.split("\\s+");
             return Pair.of(urls[0], urls[1]);
           }
         }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
index 52b8ff5..2f54b8a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
@@ -19,12 +19,13 @@ package org.apache.crunch.io;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.CreateOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.text.NLineFileSource;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.writable.Writables;
@@ -33,19 +34,33 @@ import org.apache.hadoop.conf.Configuration;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.List;
+
 public class NLineInputIT {
 
+  private static List<String> URLS = ImmutableList.of(
+  "www.A.com       www.B.com",
+  "www.A.com       www.C.com",
+  "www.A.com       www.D.com",
+  "www.A.com       www.E.com",
+  "www.B.com       www.D.com",
+  "www.B.com       www.E.com",
+  "www.C.com       www.D.com",
+  "www.D.com       www.B.com",
+  "www.E.com       www.A.com",
+  "www.F.com       www.B.com",
+  "www.F.com       www.C.com");
+
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
-  
+
   @Test
   public void testNLine() throws Exception {
-    String urlsInputPath = tmpDir.copyResourceFileName("urls.txt");
     Configuration conf = new Configuration(tmpDir.getDefaultConfiguration());
     conf.setInt("io.sort.mb", 10);
     Pipeline pipeline = new MRPipeline(NLineInputIT.class, conf);
-    PCollection<String> urls = pipeline.read(new NLineFileSource<String>(urlsInputPath,
-        Writables.strings(), 2));
+    PCollection<String> urls = pipeline.create(URLS, Writables.strings(),
+        CreateOptions.parallelism(6));
     assertEquals(new Integer(2),
         urls.parallelDo(new LineCountFn(), Avros.ints()).max().getValue());
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
index 511e827..6c3f340 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -191,6 +192,24 @@ public class AvroFileSourceTargetIT implements Serializable {
   }
 
   @Test
+  public void testGenericCreate() throws IOException {
+    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+    GenericData.Record savedRecord = new GenericData.Record(genericPersonSchema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Record> genericCollection = pipeline.create(ImmutableList.of(savedRecord),
+            Avros.generics(genericPersonSchema));
+
+    List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+
+    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+  }
+
+  @Test
   public void testReflect() throws IOException {
     Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class);
     GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index e501373..142f5fb 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.avro.generic.GenericData;
@@ -63,9 +64,8 @@ public class AvroMemPipelineIT implements Serializable {
 
     Person writeRecord = createSpecificRecord();
 
-    final PCollection<Person> writeCollection = MemPipeline.typedCollectionOf(
-                                                  Avros.specifics(Person.class),
-                                                  writeRecord);
+    final PCollection<Person> writeCollection = MemPipeline.getInstance().create(
+            ImmutableList.of(writeRecord), Avros.specifics(Person.class));
 
     writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java b/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java
new file mode 100644
index 0000000..18bcc83
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Additional options that can be specified when creating a new PCollection using {@link Pipeline#create}.
+ */
+public class CreateOptions {
+
+  public static CreateOptions none() {
+    return new CreateOptions("CREATED", 1);
+  }
+
+  public static CreateOptions parallelism(int parallelism) {
+    return new CreateOptions("CREATED", parallelism);
+  }
+
+  public static CreateOptions name(String name) {
+    return new CreateOptions(name, 1);
+  }
+
+  public static CreateOptions nameAndParallelism(String name, int parallelism) {
+    return new CreateOptions(name, parallelism);
+  }
+
+  private final String name;
+  private final int parallelism;
+
+  private CreateOptions(String name, int parallelism) {
+    this.name = Preconditions.checkNotNull(name);
+    Preconditions.checkArgument(parallelism > 0, "Invalid parallelism value = %d", parallelism);
+    this.parallelism = parallelism;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getParallelism() {
+    return parallelism;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index cd3f3f6..ee11fee 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -55,6 +55,16 @@ public interface Pipeline {
   <T> PCollection<T> read(Source<T> source);
 
   /**
+   * Converts the given {@code Source} into a {@code PCollection} that is
+   * available to jobs run using this {@code Pipeline} instance.
+   *
+   * @param source The source of data
+   * @param named A name for the returned PCollection
+   * @return A PCollection that references the given source
+   */
+  <T> PCollection<T> read(Source<T> source, String named);
+
+  /**
    * A version of the read method for {@code TableSource} instances that map to
    * {@code PTable}s.
    * 
@@ -64,6 +74,16 @@ public interface Pipeline {
    */
   <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
 
+ /**
+   * A version of the read method for {@code TableSource} instances that map to
+   * {@code PTable}s.
+   *
+   * @param tableSource The source of the data
+   * @param named A name for the returned PTable
+   * @return A PTable that references the given source
+   */
+  <K, V> PTable<K, V> read(TableSource<K, V> tableSource, String named);
+
   /**
    * Write the given collection to the given target on the next pipeline run. The
    * system will check to see if the target's location already exists using the
@@ -128,6 +148,48 @@ public interface Pipeline {
   <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype);
 
   /**
+   * Creates a {@code PCollection} containing the values found in the given {@code Iterable}
+   * using an implementation-specific distribution mechanism.
+   *
+   * @param contents The values the new PCollection will contain
+   * @param ptype The PType of the PCollection
+   * @return A PCollection that contains the given values
+   */
+  <T> PCollection<T> create(Iterable<T> contents, PType<T> ptype);
+
+  /**
+   * Creates a {@code PCollection} containing the values found in the given {@code Iterable}
+   * using an implementation-specific distribution mechanism.
+   *
+   * @param contents The values the new PCollection will contain
+   * @param ptype The PType of the PCollection
+   * @param options Additional options, such as the name or desired parallelism of the PCollection
+   * @return A PCollection that contains the given values
+   */
+  <T> PCollection<T> create(Iterable<T> contents, PType<T> ptype, CreateOptions options);
+
+  /**
+   * Creates a {@code PTable} containing the values found in the given {@code Iterable}
+   * using an implementation-specific distribution mechanism.
+   *
+   * @param contents The values the new PTable will contain
+   * @param ptype The PTableType of the PTable
+   * @return A PTable that contains the given values
+   */
+  <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype);
+
+  /**
+   * Creates a {@code PTable} containing the values found in the given {@code Iterable}
+   * using an implementation-specific distribution mechanism.
+   *
+   * @param contents The values the new PTable will contain
+   * @param ptype The PTableType of the PTable
+   * @param options Additional options, such as the name or desired parallelism of the PTable
+   * @return A PTable that contains the given values
+   */
+  <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options);
+
+  /**
    * Executes the given {@code PipelineCallable} on the client after the {@code Targets}
    * that the PipelineCallable depends on (if any) have been created by other pipeline
    * processing steps.

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 64510f4..61c01f1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -17,16 +17,19 @@
  */
 package org.apache.crunch.impl.dist;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.crunch.CreateOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
-import org.apache.crunch.PipelineResult;
 import org.apache.crunch.PipelineCallable;
+import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
@@ -133,11 +136,19 @@ public abstract class DistributedPipeline implements Pipeline {
   }
 
   public <S> PCollection<S> read(Source<S> source) {
-    return factory.createInputCollection(source, this, getCurrentPDoOptions());
+    return read(source, null);
+  }
+
+  public <S> PCollection<S> read(Source<S> source, String named) {
+    return factory.createInputCollection(source, named, this, getCurrentPDoOptions());
   }
 
   public <K, V> PTable<K, V> read(TableSource<K, V> source) {
-    return factory.createInputTable(source, this, getCurrentPDoOptions());
+    return read(source, null);
+  }
+
+  public <K, V> PTable<K, V> read(TableSource<K, V> source, String named) {
+    return factory.createInputTable(source, named, this, getCurrentPDoOptions());
   }
 
   private ParallelDoOptions getCurrentPDoOptions() {
@@ -228,6 +239,44 @@ public abstract class DistributedPipeline implements Pipeline {
     return new EmptyPTable<K, V>(this, ptype);
   }
 
+  @Override
+  public <S> PCollection<S> create(Iterable<S> contents, PType<S> ptype) {
+    return create(contents, ptype, CreateOptions.none());
+  }
+
+  @Override
+  public <S> PCollection<S> create(Iterable<S> contents, PType<S> ptype, CreateOptions options) {
+    if (Iterables.isEmpty(contents)) {
+      return emptyPCollection(ptype);
+    }
+    ReadableSource<S> src = null;
+    try {
+      src = ptype.createSourceTarget(getConfiguration(), createTempPath(), contents, options.getParallelism());
+    } catch (IOException e) {
+      throw new CrunchRuntimeException("Error creating PCollection: " + contents, e);
+    }
+    return read(src);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype) {
+    return create(contents, ptype, CreateOptions.none());
+  }
+
+  @Override
+  public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options) {
+    if (Iterables.isEmpty(contents)) {
+      return emptyPTable(ptype);
+    }
+    ReadableSource<Pair<K, V>> src = null;
+    try {
+      src = ptype.createSourceTarget(getConfiguration(), createTempPath(), contents, options.getParallelism());
+    } catch (IOException e) {
+      throw new CrunchRuntimeException("Error creating PTable: " + contents, e);
+    }
+    return read(src).parallelDo(IdentityFn.<Pair<K, V>>getInstance(), ptype);
+  }
+
   /**
    * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.
    * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
index 8d3887d..2a7e5c2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
@@ -37,8 +37,8 @@ public class BaseInputCollection<S> extends PCollectionImpl<S> {
     this.source = source;
   }
 
-  public BaseInputCollection(Source<S> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
-    super(source.toString(), pipeline, doOpts);
+  public BaseInputCollection(Source<S> source, String name, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
+    super(name == null ? source.toString() : name, pipeline, doOpts);
     this.source = source;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
index cbab255..18a671b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
@@ -37,13 +37,13 @@ public class BaseInputTable<K, V> extends PTableBase<K, V> {
     super(source.toString(), pipeline);
     this.source = source;
     this.asCollection = pipeline.getFactory().createInputCollection(
-        source, pipeline, ParallelDoOptions.builder().build());
+        source, source.toString(), pipeline, ParallelDoOptions.builder().build());
   }
 
-  public BaseInputTable(TableSource<K, V> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
+  public BaseInputTable(TableSource<K, V> source, String name, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
     super(source.toString(), pipeline, doOpts);
     this.source = source;
-    this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline, doOpts);
+    this.asCollection = pipeline.getFactory().createInputCollection(source, name, pipeline, doOpts);
   }
 
   public TableSource<K, V> getSource() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
index 9077fc9..4cd7d03 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
@@ -35,11 +35,13 @@ public interface PCollectionFactory {
 
   <S> BaseInputCollection<S> createInputCollection(
       Source<S> source,
+      String named,
       DistributedPipeline distributedPipeline,
       ParallelDoOptions doOpts);
 
   <K, V> BaseInputTable<K, V> createInputTable(
       TableSource<K,V> source,
+      String named,
       DistributedPipeline distributedPipeline,
       ParallelDoOptions doOpts);
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index e2287ff..f098f2b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -169,7 +169,9 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   public PCollection<S> write(Target target) {
     if (materializedAt != null) {
-      getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions), target);
+      getPipeline().write(
+          pipeline.getFactory().createInputCollection(materializedAt, getName(), pipeline, doOptions),
+          target);
     } else {
       getPipeline().write(this, target);
     }
@@ -180,7 +182,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   public PCollection<S> write(Target target, Target.WriteMode writeMode) {
     if (materializedAt != null) {
       getPipeline().write(
-          pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions),
+          pipeline.getFactory().createInputCollection(materializedAt, getName(), pipeline, doOptions),
           target,
           writeMode);
     } else {
@@ -203,7 +205,8 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   public void accept(Visitor visitor) {
     if (materializedAt != null) {
-      visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions));
+      visitor.visitInputCollection(
+              pipeline.getFactory().createInputCollection(materializedAt, getName(), pipeline, doOptions));
     } else {
       acceptInternal(visitor);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
index a893b9e..e81773e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -92,7 +92,7 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   public PTable<K, V> write(Target target) {
     if (getMaterializedAt() != null) {
       getPipeline().write(pipeline.getFactory().createInputTable(
-          (TableSource<K, V>) getMaterializedAt(), pipeline, doOptions), target);
+          (TableSource<K, V>) getMaterializedAt(), getName(), pipeline, doOptions), target);
     } else {
       getPipeline().write(this, target);
     }
@@ -103,7 +103,7 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
     if (getMaterializedAt() != null) {
       getPipeline().write(pipeline.getFactory().createInputTable(
-          (TableSource<K, V>) getMaterializedAt(), pipeline, doOptions), target, writeMode);
+          (TableSource<K, V>) getMaterializedAt(), getName(), pipeline, doOptions), target, writeMode);
     } else {
       getPipeline().write(this, target, writeMode);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 49e5662..e61b6dc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -31,14 +30,15 @@ import com.google.common.base.Charsets;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.crunch.CachingOptions;
+import org.apache.crunch.CreateOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
-import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
@@ -151,32 +151,44 @@ public class MemPipeline implements Pipeline {
 
   @Override
   public <T> PCollection<T> read(Source<T> source) {
+    return read(source, null);
+  }
+
+  @Override
+  public <T> PCollection<T> read(Source<T> source, String named) {
+    String name = named == null ? source.toString() : named;
     if (source instanceof ReadableSource) {
       try {
         Iterable<T> iterable = ((ReadableSource<T>) source).read(conf);
-        return new MemCollection<T>(iterable, source.getType(), source.toString());
+        return new MemCollection<T>(iterable, source.getType(), name);
       } catch (IOException e) {
-        LOG.error("Exception reading source: " + source.toString(), e);
+        LOG.error("Exception reading source: " + name, e);
         throw new IllegalStateException(e);
       }
     }
-    LOG.error("Source {} is not readable", source);
-    throw new IllegalStateException("Source " + source + " is not readable");
+    LOG.error("Source {} is not readable", name);
+    throw new IllegalStateException("Source " + name + " is not readable");
   }
 
   @Override
   public <K, V> PTable<K, V> read(TableSource<K, V> source) {
+    return read(source, null);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> read(TableSource<K, V> source, String named) {
+    String name = named == null ? source.toString() : named;
     if (source instanceof ReadableSource) {
       try {
         Iterable<Pair<K, V>> iterable = ((ReadableSource<Pair<K, V>>) source).read(conf);
-        return new MemTable<K, V>(iterable, source.getTableType(), source.toString());
+        return new MemTable<K, V>(iterable, source.getTableType(), name);
       } catch (IOException e) {
-        LOG.error("Exception reading source: " + source, e);
+        LOG.error("Exception reading source: " + name, e);
         throw new IllegalStateException(e);
       }
     }
-    LOG.error("Source {} is not readable", source);
-    throw new IllegalStateException("Source " + source + " is not readable");
+    LOG.error("Source {} is not readable", name);
+    throw new IllegalStateException("Source " + name + " is not readable");
   }
 
   @Override
@@ -332,6 +344,26 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public <T> PCollection<T> create(Iterable<T> contents, PType<T> ptype) {
+    return create(contents, ptype, CreateOptions.none());
+  }
+
+  @Override
+  public <T> PCollection<T> create(Iterable<T> iterable, PType<T> ptype, CreateOptions options) {
+    return typedCollectionOf(ptype, iterable);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype) {
+    return create(contents, ptype, CreateOptions.none());
+  }
+
+  @Override
+  public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options) {
+    return typedTableOf(ptype, contents);
+  }
+
+  @Override
   public <Output> Output sequentialDo(PipelineCallable<Output> callable) {
     Output out = callable.generateOutput(this);
     try {

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
index ea189f8..79a427e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -28,8 +28,8 @@ import org.apache.crunch.io.ReadableSource;
 
 public class InputCollection<S> extends BaseInputCollection<S> implements MRCollection {
 
-  public InputCollection(Source<S> source, MRPipeline pipeline, ParallelDoOptions doOpts) {
-    super(source, pipeline, doOpts);
+  public InputCollection(Source<S> source, String name, MRPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source, name, pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index 3154190..380382c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -26,8 +26,8 @@ import org.apache.crunch.impl.mr.plan.DoNode;
 
 public class InputTable<K, V> extends BaseInputTable<K, V> implements MRCollection {
 
-  public InputTable(TableSource<K, V> source, MRPipeline pipeline, ParallelDoOptions doOpts) {
-    super(source, pipeline, doOpts);
+  public InputTable(TableSource<K, V> source, String name, MRPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source, name, pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
index ede88f2..4e0ef7d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
@@ -52,17 +52,19 @@ public class MRCollectionFactory implements PCollectionFactory {
   @Override
   public <S> BaseInputCollection<S> createInputCollection(
       Source<S> source,
+      String name,
       DistributedPipeline pipeline,
       ParallelDoOptions doOpts) {
-    return new InputCollection<S>(source, (MRPipeline) pipeline, doOpts);
+    return new InputCollection<S>(source, name, (MRPipeline) pipeline, doOpts);
   }
 
   @Override
   public <K, V> BaseInputTable<K, V> createInputTable(
       TableSource<K, V> source,
+      String name,
       DistributedPipeline pipeline,
       ParallelDoOptions doOpts) {
-    return new InputTable<K, V>(source, (MRPipeline) pipeline, doOpts);
+    return new InputTable<K, V>(source, name, (MRPipeline) pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PType.java b/crunch-core/src/main/java/org/apache/crunch/types/PType.java
index ebddf84..258ce79 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PType.java
@@ -17,12 +17,14 @@
  */
 package org.apache.crunch.types;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -80,6 +82,18 @@ public interface PType<T> extends Serializable {
   ReadableSourceTarget<T> getDefaultFileSource(Path path);
 
   /**
+   * Returns a {@code ReadableSource} that contains the data in the given {@code Iterable}.
+   *
+   * @param conf The Configuration to use
+   * @param path The path to write the data to
+   * @param contents The contents to write
+   * @param parallelism The desired parallelism
+   * @return A new instance of ReadableSource
+   */
+  ReadableSource<T> createSourceTarget(Configuration conf, Path path, Iterable<T> contents, int parallelism)
+    throws IOException;
+
+  /**
    * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
    */
   List<PType> getSubTypes();

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 3df313f..c359afc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.types.avro;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.avro.mapred.AvroJob;
@@ -27,11 +28,13 @@ import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
 /**
@@ -112,4 +115,12 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
     }
   }
 
+  @Override
+  public ReadableSource<Pair<K, Iterable<V>>> createSourceTarget(
+          Configuration conf,
+          Path path,
+          Iterable<Pair<K, Iterable<V>>> contents,
+          int parallelism) throws IOException {
+    throw new UnsupportedOperationException("GroupedTableTypes do not support creating ReadableSources");
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 4e35b91..9dbf6b0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -17,22 +17,32 @@
  */
 package org.apache.crunch.types.avro;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.avro.AvroFileSource;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -50,6 +60,7 @@ public class AvroType<T> implements PType<T> {
     GENERIC
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(AvroType.class);
   private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
 
   private final Class<T> typeClass;
@@ -197,6 +208,41 @@ public class AvroType<T> implements PType<T> {
   }
 
   @Override
+  public ReadableSource<T> createSourceTarget(Configuration conf, Path path, Iterable<T> contents, int parallelism)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    baseOutputMapFn.setConfiguration(conf);
+    baseOutputMapFn.initialize();
+    fs.mkdirs(path);
+    List<FSDataOutputStream> streams = Lists.newArrayListWithExpectedSize(parallelism);
+    List<DataFileWriter> writers = Lists.newArrayListWithExpectedSize(parallelism);
+    for (int i = 0; i < parallelism; i++) {
+      Path out = new Path(path, "out" + i);
+      FSDataOutputStream stream = fs.create(out);
+      DatumWriter datumWriter = Avros.newWriter(this);
+      DataFileWriter writer = new DataFileWriter(datumWriter);
+      writer.create(getSchema(), stream);
+
+      streams.add(stream);
+      writers.add(writer);
+    }
+    int target = 0;
+    for (T value : contents) {
+      writers.get(target).append(baseOutputMapFn.map(value));
+      target = (target + 1) % parallelism;
+    }
+    for (DataFileWriter writer : writers) {
+      writer.close();
+    }
+    for (FSDataOutputStream stream : streams) {
+      stream.close();
+    }
+    ReadableSource<T> ret = new AvroFileSource<T>(path, this);
+    ret.inputConf(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+    return ret;
+  }
+
+  @Override
   public void initialize(Configuration conf) {
     baseInputMapFn.setConfiguration(conf);
     baseInputMapFn.initialize();

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
index 3167591..c25345b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -20,12 +20,16 @@ package org.apache.crunch.types.writable;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
+import java.io.IOException;
+
 class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
 
   private final MapFn inputFn;
@@ -73,6 +77,15 @@ class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   }
 
   @Override
+  public ReadableSource<Pair<K, Iterable<V>>> createSourceTarget(
+      Configuration conf,
+      Path path,
+      Iterable<Pair<K, Iterable<V>>> contents,
+      int parallelism) throws IOException {
+    throw new UnsupportedOperationException("GroupedTableTypes do not support creating ReadableSources");
+  }
+
+  @Override
   public void configureShuffle(Job job, GroupingOptions options) {
     if (options != null) {
       options.configure(job);

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
index 93e0fd6..d0cc915 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -17,13 +17,19 @@
  */
 package org.apache.crunch.types.writable;
 
+import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSource;
+import org.apache.crunch.io.seq.SeqFileTableSource;
 import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
@@ -32,7 +38,10 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 
 import com.google.common.collect.ImmutableList;
@@ -103,6 +112,33 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   }
 
   @Override
+  public ReadableSource<Pair<K, V>> createSourceTarget(
+          Configuration conf, Path path, Iterable<Pair<K, V>> contents, int parallelism) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    outputFn.setConfiguration(conf);
+    outputFn.initialize();
+    fs.mkdirs(path);
+    List<SequenceFile.Writer> writers = Lists.newArrayListWithExpectedSize(parallelism);
+    for (int i = 0; i < parallelism; i++) {
+      Path out = new Path(path, "out" + i);
+      writers.add(SequenceFile.createWriter(fs, conf, out, keyType.getSerializationClass(),
+              valueType.getSerializationClass()));
+    }
+    int target = 0;
+    for (Pair<K, V> value : contents) {
+      Pair writablePair = (Pair) outputFn.map(value);
+      writers.get(target).append(writablePair.first(), writablePair.second());
+      target = (target + 1) % parallelism;
+    }
+    for (SequenceFile.Writer writer : writers) {
+      writer.close();
+    }
+    ReadableSource<Pair<K, V>> ret = new SeqFileTableSource<K, V>(path, this);
+    ret.inputConf(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+    return ret;
+  }
+
+  @Override
   public void initialize(Configuration conf) {
     keyType.initialize(conf);
     valueType.initialize(conf);

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
index ea3019b..2baf14b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -17,23 +17,40 @@
  */
 package org.apache.crunch.types.writable;
 
+import java.io.IOException;
 import java.util.List;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSource;
 import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.io.text.NLineFileSource;
+import org.apache.crunch.io.text.TextFileSource;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WritableType<T, W extends Writable> implements PType<T> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(WritableType.class);
+
   private final Class<T> typeClass;
   private final Class<W> writableClass;
   private final Converter converter;
@@ -112,6 +129,45 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   }
 
   @Override
+  public ReadableSource<T> createSourceTarget(Configuration conf, Path path, Iterable<T> contents, int parallelism)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    outputFn.setConfiguration(conf);
+    outputFn.initialize();
+    if (Text.class.equals(writableClass) && parallelism > 1) {
+      FSDataOutputStream out = fs.create(path);
+      byte[] newLine = "\r\n".getBytes(Charsets.UTF_8);
+      double contentSize = 0;
+      for (T value : contents) {
+        Text txt = (Text) outputFn.map(value);
+        out.write(txt.toString().getBytes(Charsets.UTF_8));
+        out.write(newLine);
+        contentSize++;
+      }
+      out.close();
+      return new NLineFileSource<T>(path, this, (int) Math.ceil(contentSize / parallelism));
+    } else { // Use sequence files
+      fs.mkdirs(path);
+      List<SequenceFile.Writer> writers = Lists.newArrayListWithExpectedSize(parallelism);
+      for (int i = 0; i < parallelism; i++) {
+        Path out = new Path(path, "out" + i);
+        writers.add(SequenceFile.createWriter(fs, conf, out, NullWritable.class, writableClass));
+      }
+      int target = 0;
+      for (T value : contents) {
+        writers.get(target).append(NullWritable.get(), outputFn.map(value));
+        target = (target + 1) % parallelism;
+      }
+      for (SequenceFile.Writer writer : writers) {
+        writer.close();
+      }
+      ReadableSource<T> ret = new SeqFileSource<T>(path, this);
+      ret.inputConf(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+      return ret;
+    }
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (obj == null || !(obj instanceof WritableType)) {
       return false;

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java b/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java
new file mode 100644
index 0000000..0fca3ec
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class CreateIT {
+
+  @Rule
+  public TemporaryPath tmpDir = new TemporaryPath();
+
+  @Test
+  public void testMRWritable() throws Exception {
+    run(new MRPipeline(CreateIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMRAvro() throws Exception {
+    run(new MRPipeline(CreateIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMemWritable() throws Exception {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMemAvro() throws Exception {
+    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testSparkWritable() throws Exception {
+    run(new SparkPipeline("local", "CreateIT", CreateIT.class, tmpDir.getDefaultConfiguration()),
+            WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testSparkAvro() throws Exception {
+    run(new SparkPipeline("local", "CreateIT", CreateIT.class, tmpDir.getDefaultConfiguration()),
+            AvroTypeFamily.getInstance());
+  }
+
+  public static void run(Pipeline p, PTypeFamily ptf) {
+    PTable<String, Long> in = p.create(
+            ImmutableList.of(
+                    Pair.of("a", 2L), Pair.of("b", 3L), Pair.of("c", 5L),
+                    Pair.of("a", 1L), Pair.of("b", 8L), Pair.of("c", 7L)),
+            ptf.tableOf(ptf.strings(), ptf.longs()),
+            CreateOptions.nameAndParallelism("in", 2));
+    PTable<String, Long> out = in.groupByKey().combineValues(Aggregators.SUM_LONGS());
+    Map<String, Long> values = out.materializeToMap();
+    assertEquals(3, values.size());
+    assertEquals(3L, values.get("a").longValue());
+    assertEquals(11L, values.get("b").longValue());
+    assertEquals(12L, values.get("c").longValue());
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
index 47b9300..91adb8a 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.crunch.impl.spark.SparkPipeline;
@@ -39,6 +40,19 @@ import static org.junit.Assert.assertEquals;
 
 public class SparkPageRankIT {
 
+  private static List<String> URLS = ImmutableList.of(
+          "www.A.com       www.B.com",
+          "www.A.com       www.C.com",
+          "www.A.com       www.D.com",
+          "www.A.com       www.E.com",
+          "www.B.com       www.D.com",
+          "www.B.com       www.E.com",
+          "www.C.com       www.D.com",
+          "www.D.com       www.B.com",
+          "www.E.com       www.A.com",
+          "www.F.com       www.B.com",
+          "www.F.com       www.C.com");
+
   public static class PageRankData {
     public float score;
     public float lastScore;
@@ -80,8 +94,7 @@ public class SparkPageRankIT {
   public void testAvroReflects() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(pipeline, urlInput, prType, tf);
+    run(pipeline, prType, tf);
     pipeline.done();
   }
 
@@ -89,8 +102,7 @@ public class SparkPageRankIT {
   public void testWritablesJSON() throws Exception {
     PTypeFamily tf = WritableTypeFamily.getInstance();
     PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(pipeline, urlInput, prType, tf);
+    run(pipeline, prType, tf);
     pipeline.done();
   }
 
@@ -121,13 +133,13 @@ public class SparkPageRankIT {
         }, input.getValueType());
   }
 
-  public static void run(Pipeline pipeline, String urlInput,
+  public static void run(Pipeline pipeline,
                          PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
-    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
+    PTable<String, PageRankData> scores = pipeline.create(URLS, ptf.strings())
         .parallelDo(new MapFn<String, Pair<String, String>>() {
           @Override
           public Pair<String, String> map(String input) {
-            String[] urls = input.split("\\t");
+            String[] urls = input.split("\\s+");
             return Pair.of(urls[0], urls[1]);
           }
         }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
@@ -150,5 +162,6 @@ public class SparkPageRankIT {
       }, ptf.floats())).getValue();
     }
     assertEquals(0.0048, delta, 0.001);
+    pipeline.done();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 3367d3c..8108de8 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -20,12 +20,16 @@ package org.apache.crunch.impl.spark;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.crunch.CachingOptions;
+import org.apache.crunch.CreateOptions;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.collect.CreatedCollection;
+import org.apache.crunch.impl.spark.collect.CreatedTable;
 import org.apache.crunch.impl.spark.collect.EmptyPCollection;
 import org.apache.crunch.impl.spark.collect.EmptyPTable;
 import org.apache.crunch.impl.spark.collect.SparkCollectFactory;
@@ -92,6 +96,16 @@ public class SparkPipeline extends DistributedPipeline {
   }
 
   @Override
+  public <S> PCollection<S> create(Iterable<S> contents, PType<S> ptype, CreateOptions options) {
+    return new CreatedCollection<S>(this, contents, ptype, options);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options) {
+    return new CreatedTable<K, V>(this, contents, ptype, options);
+  }
+
+  @Override
   public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
     cachedCollections.put(pcollection, toStorageLevel(options));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java
new file mode 100644
index 0000000..8c473ad
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.crunch.CreateOptions;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.crunch.impl.spark.serde.SerDe;
+import org.apache.crunch.impl.spark.serde.SerDeFactory;
+import org.apache.crunch.types.PType;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Represents a Spark-based PCollection that was created from a Java {@code Iterable} of
+ * values.
+ */
+public class CreatedCollection<T> extends PCollectionImpl<T> implements SparkCollection {
+
+  private final Iterable<T> contents;
+  private final PType<T> ptype;
+  private final int parallelism;
+  private JavaRDD<T> rdd;
+
+  public CreatedCollection(SparkPipeline p, Iterable<T> contents, PType<T> ptype, CreateOptions options) {
+    super(options.getName(), p);
+    this.contents = contents;
+    this.ptype = ptype;
+    this.parallelism = options.getParallelism();
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    // No-op
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<T> getReadableDataInternal() {
+    try {
+      return ptype.createSourceTarget(getPipeline().getConfiguration(),
+              getPipeline().createTempPath(), contents, parallelism).asReadable();
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return Iterables.size(contents);
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return -1;
+  }
+
+  @Override
+  public PType<T> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime);
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaRDD<T> getJavaRDDLikeInternal(SparkRuntime runtime) {
+    SerDe serde = SerDeFactory.create(ptype, runtime.getConfiguration());
+    ptype.initialize(runtime.getConfiguration());
+    List<ByteArray> res = Lists.newLinkedList();
+    try {
+      for (T value : contents) {
+        res.add(serde.toBytes(ptype.getOutputMapFn().map(value)));
+      }
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+    return runtime.getSparkContext()
+        .parallelize(res, parallelism)
+        .map(new MapInputFn<T>(serde, ptype.getInputMapFn(), runtime.getRuntimeContext()));
+  }
+
+  static class MapInputFn<T> implements Function<ByteArray, T> {
+
+    private final SerDe serde;
+    private final MapFn<Object, T> fn;
+    private final SparkRuntimeContext context;
+    private boolean initialized;
+
+    public MapInputFn(SerDe serde, MapFn<Object, T> fn, SparkRuntimeContext context) {
+      this.serde = serde;
+      this.fn = fn;
+      this.context = context;
+      this.initialized = false;
+    }
+
+    @Override
+    public T call(ByteArray byteArray) throws Exception {
+      if (!initialized) {
+        context.initialize(fn, -1);
+        initialized = true;
+      }
+      return fn.map(serde.fromBytes(byteArray.value));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java
new file mode 100644
index 0000000..fb18787
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.crunch.CreateOptions;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PTableBase;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.crunch.impl.spark.serde.SerDe;
+import org.apache.crunch.impl.spark.serde.SerDeFactory;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Represents a Spark-based PTable that was created from a Java {@code Iterable} of
+ * key-value pairs.
+ */
+public class CreatedTable<K, V> extends PTableBase<K, V> implements SparkCollection {
+
+  private final Iterable<Pair<K, V>> contents;
+  private final PTableType<K, V> ptype;
+  private final int parallelism;
+  private JavaPairRDD<K, V> rdd;
+
+  public CreatedTable(
+      SparkPipeline pipeline,
+      Iterable<Pair<K, V>> contents,
+      PTableType<K, V> ptype,
+      CreateOptions options) {
+    super(options.getName(), pipeline);
+    this.contents = contents;
+    this.ptype = ptype;
+    this.parallelism = options.getParallelism();
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    // No-op
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    try {
+      return ptype.createSourceTarget(pipeline.getConfiguration(),
+              pipeline.createTempPath(), contents, parallelism).asReadable();
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return Iterables.size(contents);
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return -1;
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime);
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaPairRDD<K, V> getJavaRDDLikeInternal(SparkRuntime runtime) {
+    ptype.initialize(runtime.getConfiguration());
+    PType keyType = ptype.getKeyType();
+    PType valueType = ptype.getValueType();
+    SerDe keySerde = SerDeFactory.create(keyType, runtime.getConfiguration());
+    SerDe valueSerde = SerDeFactory.create(valueType, runtime.getConfiguration());
+    List<Tuple2<ByteArray, ByteArray>> res = Lists.newLinkedList();
+    try {
+      for (Pair<K, V> p : contents) {
+        ByteArray key = keySerde.toBytes(keyType.getOutputMapFn().map(p.first()));
+        ByteArray value = valueSerde.toBytes(valueType.getOutputMapFn().map(p.second()));
+        res.add(new Tuple2<ByteArray, ByteArray>(key, value));
+      }
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+    return runtime.getSparkContext()
+        .parallelizePairs(res, parallelism)
+        .mapToPair(new MapPairInputFn<K, V>(
+            keySerde, valueSerde, keyType.getInputMapFn(), valueType.getInputMapFn(), runtime.getRuntimeContext()));
+  }
+
+  static class MapPairInputFn<K, V> implements PairFunction<Tuple2<ByteArray, ByteArray>, K, V> {
+
+    private final SerDe keySerde;
+    private final SerDe valueSerde;
+    private final MapFn<Object, K> keyFn;
+    private final MapFn<Object, V> valueFn;
+    private final SparkRuntimeContext context;
+    private boolean initialized;
+
+    public MapPairInputFn(
+        SerDe keySerde,
+        SerDe valueSerde,
+        MapFn<Object, K> keyFn,
+        MapFn<Object, V> valueFn,
+        SparkRuntimeContext context) {
+      this.keySerde = keySerde;
+      this.valueSerde = valueSerde;
+      this.keyFn = keyFn;
+      this.valueFn = valueFn;
+      this.context = context;
+      this.initialized = false;
+    }
+
+    @Override
+    public Tuple2<K, V> call(Tuple2<ByteArray, ByteArray> in) throws Exception {
+      if (!initialized) {
+        context.initialize(keyFn, -1);
+        context.initialize(valueFn, -1);
+        initialized = true;
+      }
+      return new Tuple2<K, V>(
+          keyFn.map(keySerde.fromBytes(in._1().value)),
+              valueFn.map(valueSerde.fromBytes(in._2().value)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
index 8273236..51e1eac 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
@@ -38,8 +38,8 @@ import java.io.IOException;
 
 public class InputCollection<S> extends BaseInputCollection<S> implements SparkCollection {
 
-  InputCollection(Source<S> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
-    super(source, pipeline, doOpts);
+  InputCollection(Source<S> source, String named, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source, named, pipeline, doOpts);
   }
 
   public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
@@ -53,7 +53,7 @@ public class InputCollection<S> extends BaseInputCollection<S> implements SparkC
           CrunchInputFormat.class,
           converter.getKeyClass(),
           converter.getValueClass());
-      input.rdd().setName(source.toString());
+      input.rdd().setName(getName());
       MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance();
       return input
           .map(new InputConverterFunction(source.getConverter()))

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
index a0f7189..72963ec 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
@@ -37,8 +37,8 @@ import java.io.IOException;
 
 public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkCollection {
 
-  public InputTable(TableSource<K, V> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
-    super(source, pipeline, doOpts);
+  public InputTable(TableSource<K, V> source, String named, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source, named, pipeline, doOpts);
   }
 
   @Override
@@ -52,7 +52,7 @@ public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkColle
           CrunchInputFormat.class,
           converter.getKeyClass(),
           converter.getValueClass());
-      input.rdd().setName(source.toString());
+      input.rdd().setName(getName());
       MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance();
       return input
           .map(new InputConverterFunction(source.getConverter()))

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
index 1421945..37f3254 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
@@ -45,17 +45,19 @@ public class SparkCollectFactory implements PCollectionFactory {
   @Override
   public <S> BaseInputCollection<S> createInputCollection(
       Source<S> source,
+      String named,
       DistributedPipeline pipeline,
       ParallelDoOptions doOpts) {
-    return new InputCollection<S>(source, pipeline, doOpts);
+    return new InputCollection<S>(source, named, pipeline, doOpts);
   }
 
   @Override
   public <K, V> BaseInputTable<K, V> createInputTable(
       TableSource<K, V> source,
+      String named,
       DistributedPipeline pipeline,
       ParallelDoOptions doOpts) {
-    return new InputTable<K, V>(source, pipeline, doOpts);
+    return new InputTable<K, V>(source, named, pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
index cef60e3..5b21851 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
@@ -18,18 +18,12 @@
 package org.apache.crunch.impl.spark.serde;
 
 import com.google.common.base.Function;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.crunch.impl.spark.ByteArray;
 import org.apache.crunch.impl.spark.ByteArrayHelper;
 import org.apache.crunch.types.avro.AvroMode;

http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java
new file mode 100644
index 0000000..afab611
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.serde;
+
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroMode;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class SerDeFactory {
+  public static SerDe create(PType<?> ptype, Configuration conf) {
+    if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) {
+      return new WritableSerDe(((WritableType) ptype).getSerializationClass());
+    } else {
+      AvroType at = (AvroType) ptype;
+      Map<String, String> props = AvroMode.fromType(at).withFactoryFromConfiguration(conf).getModeProperties();
+      return new AvroSerDe(at, props);
+    }
+  }
+}


Mime
View raw message