Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5892710AD4 for ; Sun, 29 Dec 2013 19:51:21 +0000 (UTC) Received: (qmail 24861 invoked by uid 500); 29 Dec 2013 19:51:21 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 24837 invoked by uid 500); 29 Dec 2013 19:51:20 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 24830 invoked by uid 99); 29 Dec 2013 19:51:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Dec 2013 19:51:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 44B5288C18A; Sun, 29 Dec 2013 19:51:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-315: Add support for Empty PCollections/PTables. Date: Sun, 29 Dec 2013 19:51:20 +0000 (UTC) Updated Branches: refs/heads/master 58eb227d7 -> 5d666fe97 CRUNCH-315: Add support for Empty PCollections/PTables. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d666fe9 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d666fe9 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d666fe9 Branch: refs/heads/master Commit: 5d666fe97b2273d17accaa9ec1adcb8cbee41885 Parents: 58eb227 Author: Josh Wills Authored: Wed Dec 25 21:43:34 2013 -0800 Committer: Josh Wills Committed: Sun Dec 29 11:48:14 2013 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/EmptyPCollectionIT.java | 83 ++++++++++++++++++++ .../main/java/org/apache/crunch/Pipeline.java | 6 ++ .../crunch/impl/dist/DistributedPipeline.java | 20 +++-- .../impl/dist/collect/EmptyPCollection.java | 67 ++++++++++++++++ .../crunch/impl/dist/collect/EmptyPTable.java | 72 +++++++++++++++++ .../impl/dist/collect/EmptyReadableData.java | 45 +++++++++++ .../org/apache/crunch/impl/mem/MemPipeline.java | 10 +++ .../crunch/impl/mr/MRPipelineExecution.java | 2 - .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 17 +++- .../java/org/apache/crunch/io/CrunchInputs.java | 7 +- .../apache/crunch/SparkEmptyPCollectionIT.java | 83 ++++++++++++++++++++ .../apache/crunch/impl/spark/SparkPipeline.java | 15 ++++ .../impl/spark/collect/EmptyPCollection.java | 38 +++++++++ .../crunch/impl/spark/collect/EmptyPTable.java | 38 +++++++++ 14 files changed, 488 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java new file mode 100644 index 0000000..2e5a8c3 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.Iterables; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.writable.Writables; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class EmptyPCollectionIT extends CrunchTestSupport implements Serializable { + + private static class SplitFn extends DoFn> { + @Override + public void process(String input, Emitter> emitter) { + for (String word : input.split("\\s+")) { + emitter.emit(Pair.of(word, 1L)); + } + } + } + + @Test + public void testEmptyMR() throws Exception { + MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration()); + assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings()) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()) + .materialize())); + p.done(); + } + + @Test + public void testUnionWithEmptyMR() throws Exception { + MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration()); + assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings()) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())) + .union( + p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt"))) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()) + .materialize())); + p.done(); + } + + @Test + public void testUnionTableWithEmptyMR() throws Exception { + MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration()); + assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs())) + .union( + p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt"))) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()) + .materialize())); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java index 3b0bac2..f34d0ef 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java @@ -17,6 +17,8 @@ */ package org.apache.crunch; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; /** @@ -109,6 +111,10 @@ public interface Pipeline { */ void cache(PCollection pcollection, CachingOptions options); + PCollection emptyPCollection(PType ptype); + + PTable emptyPTable(PTableType ptype); + /** * Constructs and executes a series of MapReduce jobs in order to write data * to the output targets. http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index 28dbaec..82517f3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -37,6 +37,8 @@ import org.apache.crunch.impl.dist.collect.BaseInputTable; import org.apache.crunch.impl.dist.collect.BaseGroupedTable; import org.apache.crunch.impl.dist.collect.BaseUnionCollection; import org.apache.crunch.impl.dist.collect.BaseUnionTable; +import org.apache.crunch.impl.dist.collect.EmptyPCollection; +import org.apache.crunch.impl.dist.collect.EmptyPTable; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.dist.collect.PCollectionFactory; import org.apache.crunch.io.From; @@ -44,6 +46,7 @@ import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.io.To; import org.apache.crunch.materialize.MaterializableIterable; +import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; @@ -170,20 +173,15 @@ public abstract class DistributedPipeline implements Pipeline { outputTargets.get(impl).add(target); } - // TODO: sort this out - /* @Override - public Iterable materialize(PCollection pcollection) { - C pcollectionImpl = toPCollectionImpl(pcollection); - ReadableSource readableSrc = getMaterializeSourceTarget(pcollectionImpl); + public PCollection emptyPCollection(PType ptype) { + return new EmptyPCollection(this, ptype); + } - MaterializableIterable c = new MaterializableIterable(this, readableSrc); - if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { - outputTargetsToMaterialize.put(pcollectionImpl, c); - } - return (Iterable) c; + @Override + public PTable emptyPTable(PTableType ptype) { + return new EmptyPTable(this, ptype); } - */ /** * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}. http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java new file mode 100644 index 0000000..bc2d141 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.dist.collect; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.crunch.ReadableData; +import org.apache.crunch.impl.dist.DistributedPipeline; +import org.apache.crunch.types.PType; + +import java.util.List; + +public class EmptyPCollection extends PCollectionImpl { + + private final PType ptype; + + public EmptyPCollection(DistributedPipeline pipeline, PType ptype) { + super("EMPTY", pipeline); + this.ptype = Preconditions.checkNotNull(ptype); + } + + @Override + protected void acceptInternal(Visitor visitor) { + // No-op + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + protected ReadableData getReadableDataInternal() { + return new EmptyReadableData(); + } + + @Override + protected long getSizeInternal() { + return 0; + } + + @Override + public long getLastModifiedAt() { + return 0; + } + + @Override + public PType getPType() { + return ptype; + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java new file mode 100644 index 0000000..6b8c516 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.dist.collect; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.impl.dist.DistributedPipeline; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +import java.util.List; + +public class EmptyPTable extends PTableBase { + + private final PTableType ptype; + + public EmptyPTable(DistributedPipeline pipeline, PTableType ptype) { + super("EMPTY", pipeline); + this.ptype = ptype; + } + + @Override + protected void acceptInternal(Visitor visitor) { + // No-op + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + protected ReadableData> getReadableDataInternal() { + return new EmptyReadableData>(); + } + + @Override + protected long getSizeInternal() { + return 0; + } + + @Override + public long getLastModifiedAt() { + return 0; + } + + @Override + public PTableType getPTableType() { + return ptype; + } + + @Override + public PType> getPType() { + return ptype; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java new file mode 100644 index 0000000..65825d4 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.dist.collect; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import java.io.IOException; +import java.util.Set; + +class EmptyReadableData implements ReadableData { + + @Override + public Set> getSourceTargets() { + return ImmutableSet.of(); + } + + @Override + public void configure(Configuration conf) { + } + + @Override + public Iterable read(TaskInputOutputContext context) throws IOException { + return ImmutableList.of(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index ced1700..7ef9f4f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -312,6 +312,16 @@ public class MemPipeline implements Pipeline { } @Override + public PCollection emptyPCollection(PType ptype) { + return typedCollectionOf(ptype, ImmutableList.of()); + } + + @Override + public PTable emptyPTable(PTableType ptype) { + return typedTableOf(ptype, ImmutableList.>of()); + } + + @Override public PipelineExecution runAsync() { activeTargets.clear(); return new MemExecution(); http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java index b9d53fe..b7df522 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java @@ -22,7 +22,5 @@ import org.apache.crunch.PipelineExecution; import java.util.List; public interface MRPipelineExecution extends PipelineExecution { - List getJobs(); - } http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 97ac866..bce7010 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.dist.collect.PCollectionImpl; @@ -42,6 +44,8 @@ import com.google.common.collect.Sets; public class MSCRPlanner { + private static final Log LOG = LogFactory.getLog(MSCRPlanner.class); + private final MRPipeline pipeline; private final Map, Set> outputs; private final Map, MaterializableIterable> toMaterialize; @@ -98,7 +102,18 @@ public class MSCRPlanner { } Graph baseGraph = graphBuilder.getGraph(); - + boolean hasInputs = false; + for (Vertex v : baseGraph) { + if (v.isInput()) { + hasInputs = true; + break; + } + } + if (!hasInputs) { + LOG.warn("No input sources for pipeline, nothing to do..."); + return new MRExecutor(conf, jarClass, outputs, toMaterialize); + } + // Create a new graph that splits up up dependent GBK nodes. Graph graph = prepareFinalGraph(baseGraph); http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java index c1a0eef..bcdcb55 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java @@ -20,6 +20,7 @@ package org.apache.crunch.io; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; @@ -53,7 +54,11 @@ public class CrunchInputs { public static Map>> getFormatNodeMap(JobContext job) { Map>> formatNodeMap = Maps.newHashMap(); Configuration conf = job.getConfiguration(); - for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) { + String crunchInputs = conf.get(CRUNCH_INPUTS); + if (crunchInputs == null || crunchInputs.isEmpty()) { + return ImmutableMap.of(); + } + for (String input : Splitter.on(RECORD_SEP).split(crunchInputs)) { List fields = Lists.newArrayList(SPLITTER.split(input)); FormatBundle inputBundle = FormatBundle.fromSerialized(fields.get(0), job.getConfiguration()); if (!formatNodeMap.containsKey(inputBundle)) { http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java new file mode 100644 index 0000000..3137252 --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.Iterables; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.writable.Writables; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SparkEmptyPCollectionIT { + private static class SplitFn extends DoFn> { + @Override + public void process(String input, Emitter> emitter) { + for (String word : input.split("\\s+")) { + emitter.emit(Pair.of(word, 1L)); + } + } + } + + @Rule + public TemporaryPath tempDir = new TemporaryPath(); + + @Test + public void testEmptyMR() throws Exception { + Pipeline p = new SparkPipeline("local", "empty"); + assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings()) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()) + .materialize())); + p.done(); + } + + @Test + public void testUnionWithEmptyMR() throws Exception { + Pipeline p = new SparkPipeline("local", "empty"); + assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings()) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())) + .union( + p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt"))) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()) + .materialize())); + p.done(); + } + + @Test + public void testUnionTableWithEmptyMR() throws Exception { + Pipeline p = new SparkPipeline("local", "empty"); + assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs())) + .union( + p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt"))) + .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()) + .materialize())); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java index 674f0c8..49e1d35 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java @@ -21,13 +21,18 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.crunch.CachingOptions; import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.spark.collect.EmptyPCollection; +import org.apache.crunch.impl.spark.collect.EmptyPTable; import org.apache.crunch.impl.spark.collect.SparkCollectFactory; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.materialize.MaterializableIterable; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; @@ -62,6 +67,16 @@ public class SparkPipeline extends DistributedPipeline { } @Override + public PCollection emptyPCollection(PType ptype) { + return new EmptyPCollection(this, ptype); + } + + @Override + public PTable emptyPTable(PTableType ptype) { + return new EmptyPTable(this, ptype); + } + + @Override public void cache(PCollection pcollection, CachingOptions options) { cachedCollections.put(pcollection, toStorageLevel(options)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java new file mode 100644 index 0000000..7a298fb --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.collect; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.impl.dist.DistributedPipeline; +import org.apache.crunch.impl.spark.SparkCollection; +import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.types.PType; +import org.apache.spark.api.java.JavaRDDLike; + +public class EmptyPCollection extends org.apache.crunch.impl.dist.collect.EmptyPCollection + implements SparkCollection { + + public EmptyPCollection(DistributedPipeline pipeline, PType ptype) { + super(pipeline, ptype); + } + + @Override + public JavaRDDLike getJavaRDDLike(SparkRuntime runtime) { + return runtime.getSparkContext().parallelize(ImmutableList.of()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java new file mode 100644 index 0000000..97d42fd --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.collect; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.impl.dist.DistributedPipeline; +import org.apache.crunch.impl.spark.SparkCollection; +import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.types.PTableType; +import org.apache.spark.api.java.JavaRDDLike; +import scala.Tuple2; + +public class EmptyPTable extends org.apache.crunch.impl.dist.collect.EmptyPTable implements SparkCollection { + + public EmptyPTable(DistributedPipeline pipeline, PTableType ptype) { + super(pipeline, ptype); + } + + @Override + public JavaRDDLike getJavaRDDLike(SparkRuntime runtime) { + return runtime.getSparkContext().parallelizePairs(ImmutableList.>of()); + } +}