crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-315: Add support for Empty PCollections/PTables.
Date Sun, 29 Dec 2013 19:51:20 GMT
Updated Branches:
  refs/heads/master 58eb227d7 -> 5d666fe97


CRUNCH-315: Add support for Empty PCollections/PTables.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d666fe9
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d666fe9
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d666fe9

Branch: refs/heads/master
Commit: 5d666fe97b2273d17accaa9ec1adcb8cbee41885
Parents: 58eb227
Author: Josh Wills <jwills@apache.org>
Authored: Wed Dec 25 21:43:34 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Dec 29 11:48:14 2013 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/EmptyPCollectionIT.java   | 83 ++++++++++++++++++++
 .../main/java/org/apache/crunch/Pipeline.java   |  6 ++
 .../crunch/impl/dist/DistributedPipeline.java   | 20 +++--
 .../impl/dist/collect/EmptyPCollection.java     | 67 ++++++++++++++++
 .../crunch/impl/dist/collect/EmptyPTable.java   | 72 +++++++++++++++++
 .../impl/dist/collect/EmptyReadableData.java    | 45 +++++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java | 10 +++
 .../crunch/impl/mr/MRPipelineExecution.java     |  2 -
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 17 +++-
 .../java/org/apache/crunch/io/CrunchInputs.java |  7 +-
 .../apache/crunch/SparkEmptyPCollectionIT.java  | 83 ++++++++++++++++++++
 .../apache/crunch/impl/spark/SparkPipeline.java | 15 ++++
 .../impl/spark/collect/EmptyPCollection.java    | 38 +++++++++
 .../crunch/impl/spark/collect/EmptyPTable.java  | 38 +++++++++
 14 files changed, 488 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
new file mode 100644
index 0000000..2e5a8c3
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class EmptyPCollectionIT extends CrunchTestSupport implements Serializable {
+
+  private static class SplitFn extends DoFn<String, Pair<String, Long>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, Long>> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(Pair.of(word, 1L));
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyMR() throws Exception {
+    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+    assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionWithEmptyMR() throws Exception {
+    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+    assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionTableWithEmptyMR() throws Exception {
+    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+    assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index 3b0bac2..f34d0ef 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch;
 
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -109,6 +111,10 @@ public interface Pipeline {
    */
   <T> void cache(PCollection<T> pcollection, CachingOptions options);
 
+  <T> PCollection<T> emptyPCollection(PType<T> ptype);
+
+  <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype);
+
   /**
    * Constructs and executes a series of MapReduce jobs in order to write data
    * to the output targets.

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 28dbaec..82517f3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -37,6 +37,8 @@ import org.apache.crunch.impl.dist.collect.BaseInputTable;
 import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
 import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
 import org.apache.crunch.impl.dist.collect.BaseUnionTable;
+import org.apache.crunch.impl.dist.collect.EmptyPCollection;
+import org.apache.crunch.impl.dist.collect.EmptyPTable;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.dist.collect.PCollectionFactory;
 import org.apache.crunch.io.From;
@@ -44,6 +46,7 @@ import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.To;
 import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -170,20 +173,15 @@ public abstract class DistributedPipeline implements Pipeline {
     outputTargets.get(impl).add(target);
   }
 
-  // TODO: sort this out
-  /*
   @Override
-  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-    C pcollectionImpl = toPCollectionImpl(pcollection);
-    ReadableSource<?> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
+  public <S> PCollection<S> emptyPCollection(PType<S> ptype) {
+    return new EmptyPCollection<S>(this, ptype);
+  }
 
-    MaterializableIterable<?> c = new MaterializableIterable(this, readableSrc);
-    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
-      outputTargetsToMaterialize.put(pcollectionImpl, c);
-    }
-    return (Iterable<T>) c;
+  @Override
+  public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+    return new EmptyPTable<K, V>(this, ptype);
   }
-  */
 
   /**
    * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
new file mode 100644
index 0000000..bc2d141
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class EmptyPCollection<T> extends PCollectionImpl<T> {
+
+  private final PType<T> ptype;
+
+  public EmptyPCollection(DistributedPipeline pipeline, PType<T> ptype) {
+    super("EMPTY", pipeline);
+    this.ptype = Preconditions.checkNotNull(ptype);
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    // No-op
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<T> getReadableDataInternal() {
+    return new EmptyReadableData<T>();
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return 0;
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return 0;
+  }
+
+  @Override
+  public PType<T> getPType() {
+    return ptype;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
new file mode 100644
index 0000000..6b8c516
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class EmptyPTable<K, V> extends PTableBase<K, V> {
+
+  private final PTableType<K, V> ptype;
+
+  public EmptyPTable(DistributedPipeline pipeline, PTableType<K, V> ptype) {
+    super("EMPTY", pipeline);
+    this.ptype = ptype;
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    // No-op
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    return new EmptyReadableData<Pair<K, V>>();
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return 0;
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return 0;
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return ptype;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
new file mode 100644
index 0000000..65825d4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Set;
+
+class EmptyReadableData<T> implements ReadableData<T> {
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+  }
+
+  @Override
+  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws
IOException {
+    return ImmutableList.of();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index ced1700..7ef9f4f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -312,6 +312,16 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public <T> PCollection<T> emptyPCollection(PType<T> ptype) {
+    return typedCollectionOf(ptype, ImmutableList.<T>of());
+  }
+
+  @Override
+  public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+    return typedTableOf(ptype, ImmutableList.<Pair<K, V>>of());
+  }
+
+  @Override
   public PipelineExecution runAsync() {
     activeTargets.clear();
     return new MemExecution();

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
index b9d53fe..b7df522 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
@@ -22,7 +22,5 @@ import org.apache.crunch.PipelineExecution;
 import java.util.List;
 
 public interface MRPipelineExecution extends PipelineExecution {
-
     List<MRJob> getJobs();
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 97ac866..bce7010 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
@@ -42,6 +44,8 @@ import com.google.common.collect.Sets;
 
 public class MSCRPlanner {
 
+  private static final Log LOG = LogFactory.getLog(MSCRPlanner.class);
+
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
@@ -98,7 +102,18 @@ public class MSCRPlanner {
       }
       
       Graph baseGraph = graphBuilder.getGraph();
-      
+      boolean hasInputs = false;
+      for (Vertex v : baseGraph) {
+        if (v.isInput()) {
+          hasInputs = true;
+          break;
+        }
+      }
+      if (!hasInputs) {
+        LOG.warn("No input sources for pipeline, nothing to do...");
+        return new MRExecutor(conf, jarClass, outputs, toMaterialize);
+      }
+
       // Create a new graph that splits up up dependent GBK nodes.
       Graph graph = prepareFinalGraph(baseGraph);
       

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
index c1a0eef..bcdcb55 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -53,7 +54,11 @@ public class CrunchInputs {
   public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext
job) {
     Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
     Configuration conf = job.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
+    String crunchInputs = conf.get(CRUNCH_INPUTS);
+    if (crunchInputs == null || crunchInputs.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    for (String input : Splitter.on(RECORD_SEP).split(crunchInputs)) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
       FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0),
job.getConfiguration());
       if (!formatNodeMap.containsKey(inputBundle)) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
new file mode 100644
index 0000000..3137252
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SparkEmptyPCollectionIT {
+  private static class SplitFn extends DoFn<String, Pair<String, Long>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, Long>> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(Pair.of(word, 1L));
+      }
+    }
+  }
+
+  @Rule
+  public TemporaryPath tempDir = new TemporaryPath();
+
+  @Test
+  public void testEmptyMR() throws Exception {
+    Pipeline p = new SparkPipeline("local", "empty");
+    assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionWithEmptyMR() throws Exception {
+    Pipeline p = new SparkPipeline("local", "empty");
+    assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionTableWithEmptyMR() throws Exception {
+    Pipeline p = new SparkPipeline("local", "empty");
+    assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 674f0c8..49e1d35 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -21,13 +21,18 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.collect.EmptyPCollection;
+import org.apache.crunch.impl.spark.collect.EmptyPTable;
 import org.apache.crunch.impl.spark.collect.SparkCollectFactory;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
@@ -62,6 +67,16 @@ public class SparkPipeline extends DistributedPipeline {
   }
 
   @Override
+  public <S> PCollection<S> emptyPCollection(PType<S> ptype) {
+    return new EmptyPCollection<S>(this, ptype);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+    return new EmptyPTable<K, V>(this, ptype);
+  }
+
+  @Override
   public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
     cachedCollections.put(pcollection, toStorageLevel(options));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
new file mode 100644
index 0000000..7a298fb
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.types.PType;
+import org.apache.spark.api.java.JavaRDDLike;
+
+public class EmptyPCollection<T> extends org.apache.crunch.impl.dist.collect.EmptyPCollection<T>
+    implements SparkCollection {
+
+  public EmptyPCollection(DistributedPipeline pipeline, PType<T> ptype) {
+    super(pipeline, ptype);
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    return runtime.getSparkContext().parallelize(ImmutableList.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
new file mode 100644
index 0000000..97d42fd
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.types.PTableType;
+import org.apache.spark.api.java.JavaRDDLike;
+import scala.Tuple2;
+
+public class EmptyPTable<K, V> extends org.apache.crunch.impl.dist.collect.EmptyPTable<K,
V> implements SparkCollection {
+
+  public EmptyPTable(DistributedPipeline pipeline, PTableType<K, V> ptype) {
+    super(pipeline, ptype);
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    return runtime.getSparkContext().parallelizePairs(ImmutableList.<Tuple2<K, V>>of());
+  }
+}


Mime
View raw message