Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 186D810A87 for ; Mon, 26 Jan 2015 02:15:20 +0000 (UTC) Received: (qmail 70135 invoked by uid 500); 26 Jan 2015 02:15:15 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 70101 invoked by uid 500); 26 Jan 2015 02:15:15 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 70091 invoked by uid 99); 26 Jan 2015 02:15:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jan 2015 02:15:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2A4BCE04BF; Mon, 26 Jan 2015 02:15:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: crunch git commit: CRUNCH-489: Add methods for creating PCollections from Java Iterables on the client and optionally naming created/input PCollections. Date: Mon, 26 Jan 2015 02:15:15 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master ab7e5e4c3 -> 006cd72a3 CRUNCH-489: Add methods for creating PCollections from Java Iterables on the client and optionally naming created/input PCollections. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/006cd72a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/006cd72a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/006cd72a Branch: refs/heads/master Commit: 006cd72a383d91ba15ec0d6b596872a062567d48 Parents: ab7e5e4 Author: Josh Wills Authored: Thu Jan 22 15:38:57 2015 -0800 Committer: Josh Wills Committed: Sun Jan 25 09:48:35 2015 -0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/PageRankIT.java | 32 ++-- .../java/org/apache/crunch/io/NLineInputIT.java | 25 ++- .../crunch/io/avro/AvroFileSourceTargetIT.java | 19 ++ .../crunch/io/avro/AvroMemPipelineIT.java | 6 +- .../java/org/apache/crunch/CreateOptions.java | 59 ++++++ .../main/java/org/apache/crunch/Pipeline.java | 62 +++++++ .../crunch/impl/dist/DistributedPipeline.java | 55 +++++- .../impl/dist/collect/BaseInputCollection.java | 4 +- .../impl/dist/collect/BaseInputTable.java | 6 +- .../impl/dist/collect/PCollectionFactory.java | 2 + .../impl/dist/collect/PCollectionImpl.java | 9 +- .../crunch/impl/dist/collect/PTableBase.java | 4 +- .../org/apache/crunch/impl/mem/MemPipeline.java | 52 ++++-- .../crunch/impl/mr/collect/InputCollection.java | 4 +- .../crunch/impl/mr/collect/InputTable.java | 4 +- .../impl/mr/collect/MRCollectionFactory.java | 6 +- .../java/org/apache/crunch/types/PType.java | 14 ++ .../crunch/types/avro/AvroGroupedTableType.java | 11 ++ .../org/apache/crunch/types/avro/AvroType.java | 46 +++++ .../writable/WritableGroupedTableType.java | 13 ++ .../types/writable/WritableTableType.java | 36 ++++ .../crunch/types/writable/WritableType.java | 56 ++++++ .../src/it/java/org/apache/crunch/CreateIT.java | 88 +++++++++ .../java/org/apache/crunch/SparkPageRankIT.java | 27 ++- .../apache/crunch/impl/spark/SparkPipeline.java | 14 ++ .../impl/spark/collect/CreatedCollection.java | 149 +++++++++++++++ .../crunch/impl/spark/collect/CreatedTable.java | 180 +++++++++++++++++++ .../impl/spark/collect/InputCollection.java | 6 +- .../crunch/impl/spark/collect/InputTable.java | 6 +- .../impl/spark/collect/SparkCollectFactory.java | 6 +- .../crunch/impl/spark/serde/AvroSerDe.java | 6 - .../crunch/impl/spark/serde/SerDeFactory.java | 39 ++++ 32 files changed, 977 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java index 23c71b3..701f78a 100644 --- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import java.util.Collection; import java.util.List; +import com.google.common.collect.ImmutableList; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Aggregate; @@ -41,6 +42,19 @@ import com.google.common.collect.Lists; public class PageRankIT { + private static List URLS = ImmutableList.of( + "www.A.com www.B.com", + "www.A.com www.C.com", + "www.A.com www.D.com", + "www.A.com www.E.com", + "www.B.com www.D.com", + "www.B.com www.E.com", + "www.C.com www.D.com", + "www.D.com www.B.com", + "www.E.com www.A.com", + "www.F.com www.B.com", + "www.F.com www.C.com"); + public static class PageRankData { public float score; public float lastScore; @@ -76,35 +90,31 @@ public class PageRankIT { public void testAvroReflect() throws Exception { PTypeFamily tf = AvroTypeFamily.getInstance(); PType prType = Avros.reflects(PageRankData.class); - String urlInput = tmpDir.copyResourceFileName("urls.txt"); run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()), - urlInput, prType, tf); + prType, tf); } @Test public void testAvroMReflectInMemory() throws Exception { PTypeFamily tf = AvroTypeFamily.getInstance(); PType prType = Avros.reflects(PageRankData.class); - String urlInput = tmpDir.copyResourceFileName("urls.txt"); - run(MemPipeline.getInstance(), urlInput, prType, tf); + run(MemPipeline.getInstance(), prType, tf); } @Test public void testAvroJSON() throws Exception { PTypeFamily tf = AvroTypeFamily.getInstance(); PType prType = PTypes.jsonString(PageRankData.class, tf); - String urlInput = tmpDir.copyResourceFileName("urls.txt"); run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()), - urlInput, prType, tf); + prType, tf); } @Test public void testWritablesJSON() throws Exception { PTypeFamily tf = WritableTypeFamily.getInstance(); PType prType = PTypes.jsonString(PageRankData.class, tf); - String urlInput = tmpDir.copyResourceFileName("urls.txt"); run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()), - urlInput, prType, tf); + prType, tf); } public static PTable pageRank(PTable input, final float d) { @@ -134,13 +144,13 @@ public class PageRankIT { }, input.getValueType()); } - public static void run(Pipeline pipeline, String urlInput, + public static void run(Pipeline pipeline, PType prType, PTypeFamily ptf) throws Exception { - PTable scores = pipeline.readTextFile(urlInput) + PTable scores = pipeline.create(URLS, ptf.strings()) .parallelDo(new MapFn>() { @Override public Pair map(String input) { - String[] urls = input.split("\\t"); + String[] urls = input.split("\\s+"); return Pair.of(urls[0], urls[1]); } }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey() http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java index 52b8ff5..2f54b8a 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java @@ -19,12 +19,13 @@ package org.apache.crunch.io; import static org.junit.Assert.assertEquals; +import com.google.common.collect.ImmutableList; +import org.apache.crunch.CreateOptions; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.text.NLineFileSource; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.writable.Writables; @@ -33,19 +34,33 @@ import org.apache.hadoop.conf.Configuration; import org.junit.Rule; import org.junit.Test; +import java.util.List; + public class NLineInputIT { + private static List URLS = ImmutableList.of( + "www.A.com www.B.com", + "www.A.com www.C.com", + "www.A.com www.D.com", + "www.A.com www.E.com", + "www.B.com www.D.com", + "www.B.com www.E.com", + "www.C.com www.D.com", + "www.D.com www.B.com", + "www.E.com www.A.com", + "www.F.com www.B.com", + "www.F.com www.C.com"); + @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); - + @Test public void testNLine() throws Exception { - String urlsInputPath = tmpDir.copyResourceFileName("urls.txt"); Configuration conf = new Configuration(tmpDir.getDefaultConfiguration()); conf.setInt("io.sort.mb", 10); Pipeline pipeline = new MRPipeline(NLineInputIT.class, conf); - PCollection urls = pipeline.read(new NLineFileSource(urlsInputPath, - Writables.strings(), 2)); + PCollection urls = pipeline.create(URLS, Writables.strings(), + CreateOptions.parallelism(6)); assertEquals(new Integer(2), urls.parallelDo(new LineCountFn(), Avros.ints()).max().getValue()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java index 511e827..6c3f340 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; +import com.google.common.collect.ImmutableList; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -191,6 +192,24 @@ public class AvroFileSourceTargetIT implements Serializable { } @Test + public void testGenericCreate() throws IOException { + String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson"); + Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson); + GenericData.Record savedRecord = new GenericData.Record(genericPersonSchema); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection genericCollection = pipeline.create(ImmutableList.of(savedRecord), + Avros.generics(genericPersonSchema)); + + List recordList = Lists.newArrayList(genericCollection.materialize()); + + assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList)); + } + + @Test public void testReflect() throws IOException { Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class); GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java index e501373..142f5fb 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.avro.generic.GenericData; @@ -63,9 +64,8 @@ public class AvroMemPipelineIT implements Serializable { Person writeRecord = createSpecificRecord(); - final PCollection writeCollection = MemPipeline.typedCollectionOf( - Avros.specifics(Person.class), - writeRecord); + final PCollection writeCollection = MemPipeline.getInstance().create( + ImmutableList.of(writeRecord), Avros.specifics(Person.class)); writeCollection.write(To.avroFile(avroFile.getAbsolutePath())); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java b/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java new file mode 100644 index 0000000..18bcc83 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/CreateOptions.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.base.Preconditions; + +/** + * Additional options that can be specified when creating a new PCollection using {@link Pipeline#create}. + */ +public class CreateOptions { + + public static CreateOptions none() { + return new CreateOptions("CREATED", 1); + } + + public static CreateOptions parallelism(int parallelism) { + return new CreateOptions("CREATED", parallelism); + } + + public static CreateOptions name(String name) { + return new CreateOptions(name, 1); + } + + public static CreateOptions nameAndParallelism(String name, int parallelism) { + return new CreateOptions(name, parallelism); + } + + private final String name; + private final int parallelism; + + private CreateOptions(String name, int parallelism) { + this.name = Preconditions.checkNotNull(name); + Preconditions.checkArgument(parallelism > 0, "Invalid parallelism value = %d", parallelism); + this.parallelism = parallelism; + } + + public String getName() { + return name; + } + + public int getParallelism() { + return parallelism; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java index cd3f3f6..ee11fee 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java @@ -55,6 +55,16 @@ public interface Pipeline { PCollection read(Source source); /** + * Converts the given {@code Source} into a {@code PCollection} that is + * available to jobs run using this {@code Pipeline} instance. + * + * @param source The source of data + * @param named A name for the returned PCollection + * @return A PCollection that references the given source + */ + PCollection read(Source source, String named); + + /** * A version of the read method for {@code TableSource} instances that map to * {@code PTable}s. * @@ -64,6 +74,16 @@ public interface Pipeline { */ PTable read(TableSource tableSource); + /** + * A version of the read method for {@code TableSource} instances that map to + * {@code PTable}s. + * + * @param tableSource The source of the data + * @param named A name for the returned PTable + * @return A PTable that references the given source + */ + PTable read(TableSource tableSource, String named); + /** * Write the given collection to the given target on the next pipeline run. The * system will check to see if the target's location already exists using the @@ -128,6 +148,48 @@ public interface Pipeline { PTable emptyPTable(PTableType ptype); /** + * Creates a {@code PCollection} containing the values found in the given {@code Iterable} + * using an implementation-specific distribution mechanism. + * + * @param contents The values the new PCollection will contain + * @param ptype The PType of the PCollection + * @return A PCollection that contains the given values + */ + PCollection create(Iterable contents, PType ptype); + + /** + * Creates a {@code PCollection} containing the values found in the given {@code Iterable} + * using an implementation-specific distribution mechanism. + * + * @param contents The values the new PCollection will contain + * @param ptype The PType of the PCollection + * @param options Additional options, such as the name or desired parallelism of the PCollection + * @return A PCollection that contains the given values + */ + PCollection create(Iterable contents, PType ptype, CreateOptions options); + + /** + * Creates a {@code PTable} containing the values found in the given {@code Iterable} + * using an implementation-specific distribution mechanism. + * + * @param contents The values the new PTable will contain + * @param ptype The PTableType of the PTable + * @return A PTable that contains the given values + */ + PTable create(Iterable> contents, PTableType ptype); + + /** + * Creates a {@code PTable} containing the values found in the given {@code Iterable} + * using an implementation-specific distribution mechanism. + * + * @param contents The values the new PTable will contain + * @param ptype The PTableType of the PTable + * @param options Additional options, such as the name or desired parallelism of the PTable + * @return A PTable that contains the given values + */ + PTable create(Iterable> contents, PTableType ptype, CreateOptions options); + + /** * Executes the given {@code PipelineCallable} on the client after the {@code Targets} * that the PipelineCallable depends on (if any) have been created by other pipeline * processing steps. http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index 64510f4..61c01f1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -17,16 +17,19 @@ */ package org.apache.crunch.impl.dist; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.crunch.CreateOptions; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; +import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Pipeline; -import org.apache.crunch.PipelineResult; import org.apache.crunch.PipelineCallable; +import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; @@ -133,11 +136,19 @@ public abstract class DistributedPipeline implements Pipeline { } public PCollection read(Source source) { - return factory.createInputCollection(source, this, getCurrentPDoOptions()); + return read(source, null); + } + + public PCollection read(Source source, String named) { + return factory.createInputCollection(source, named, this, getCurrentPDoOptions()); } public PTable read(TableSource source) { - return factory.createInputTable(source, this, getCurrentPDoOptions()); + return read(source, null); + } + + public PTable read(TableSource source, String named) { + return factory.createInputTable(source, named, this, getCurrentPDoOptions()); } private ParallelDoOptions getCurrentPDoOptions() { @@ -228,6 +239,44 @@ public abstract class DistributedPipeline implements Pipeline { return new EmptyPTable(this, ptype); } + @Override + public PCollection create(Iterable contents, PType ptype) { + return create(contents, ptype, CreateOptions.none()); + } + + @Override + public PCollection create(Iterable contents, PType ptype, CreateOptions options) { + if (Iterables.isEmpty(contents)) { + return emptyPCollection(ptype); + } + ReadableSource src = null; + try { + src = ptype.createSourceTarget(getConfiguration(), createTempPath(), contents, options.getParallelism()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error creating PCollection: " + contents, e); + } + return read(src); + } + + @Override + public PTable create(Iterable> contents, PTableType ptype) { + return create(contents, ptype, CreateOptions.none()); + } + + @Override + public PTable create(Iterable> contents, PTableType ptype, CreateOptions options) { + if (Iterables.isEmpty(contents)) { + return emptyPTable(ptype); + } + ReadableSource> src = null; + try { + src = ptype.createSourceTarget(getConfiguration(), createTempPath(), contents, options.getParallelism()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error creating PTable: " + contents, e); + } + return read(src).parallelDo(IdentityFn.>getInstance(), ptype); + } + /** * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}. * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java index 8d3887d..2a7e5c2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java @@ -37,8 +37,8 @@ public class BaseInputCollection extends PCollectionImpl { this.source = source; } - public BaseInputCollection(Source source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - super(source.toString(), pipeline, doOpts); + public BaseInputCollection(Source source, String name, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(name == null ? source.toString() : name, pipeline, doOpts); this.source = source; } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java index cbab255..18a671b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java @@ -37,13 +37,13 @@ public class BaseInputTable extends PTableBase { super(source.toString(), pipeline); this.source = source; this.asCollection = pipeline.getFactory().createInputCollection( - source, pipeline, ParallelDoOptions.builder().build()); + source, source.toString(), pipeline, ParallelDoOptions.builder().build()); } - public BaseInputTable(TableSource source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + public BaseInputTable(TableSource source, String name, DistributedPipeline pipeline, ParallelDoOptions doOpts) { super(source.toString(), pipeline, doOpts); this.source = source; - this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline, doOpts); + this.asCollection = pipeline.getFactory().createInputCollection(source, name, pipeline, doOpts); } public TableSource getSource() { http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java index 9077fc9..4cd7d03 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java @@ -35,11 +35,13 @@ public interface PCollectionFactory { BaseInputCollection createInputCollection( Source source, + String named, DistributedPipeline distributedPipeline, ParallelDoOptions doOpts); BaseInputTable createInputTable( TableSource source, + String named, DistributedPipeline distributedPipeline, ParallelDoOptions doOpts); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index e2287ff..f098f2b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -169,7 +169,9 @@ public abstract class PCollectionImpl implements PCollection { public PCollection write(Target target) { if (materializedAt != null) { - getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions), target); + getPipeline().write( + pipeline.getFactory().createInputCollection(materializedAt, getName(), pipeline, doOptions), + target); } else { getPipeline().write(this, target); } @@ -180,7 +182,7 @@ public abstract class PCollectionImpl implements PCollection { public PCollection write(Target target, Target.WriteMode writeMode) { if (materializedAt != null) { getPipeline().write( - pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions), + pipeline.getFactory().createInputCollection(materializedAt, getName(), pipeline, doOptions), target, writeMode); } else { @@ -203,7 +205,8 @@ public abstract class PCollectionImpl implements PCollection { public void accept(Visitor visitor) { if (materializedAt != null) { - visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions)); + visitor.visitInputCollection( + pipeline.getFactory().createInputCollection(materializedAt, getName(), pipeline, doOptions)); } else { acceptInternal(visitor); } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java index a893b9e..e81773e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java @@ -92,7 +92,7 @@ public abstract class PTableBase extends PCollectionImpl> imple public PTable write(Target target) { if (getMaterializedAt() != null) { getPipeline().write(pipeline.getFactory().createInputTable( - (TableSource) getMaterializedAt(), pipeline, doOptions), target); + (TableSource) getMaterializedAt(), getName(), pipeline, doOptions), target); } else { getPipeline().write(this, target); } @@ -103,7 +103,7 @@ public abstract class PTableBase extends PCollectionImpl> imple public PTable write(Target target, Target.WriteMode writeMode) { if (getMaterializedAt() != null) { getPipeline().write(pipeline.getFactory().createInputTable( - (TableSource) getMaterializedAt(), pipeline, doOptions), target, writeMode); + (TableSource) getMaterializedAt(), getName(), pipeline, doOptions), target, writeMode); } else { getPipeline().write(this, target, writeMode); } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 49e5662..e61b6dc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -31,14 +30,15 @@ import com.google.common.base.Charsets; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.crunch.CachingOptions; +import org.apache.crunch.CreateOptions; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; -import org.apache.crunch.PipelineCallable; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.Target; @@ -151,32 +151,44 @@ public class MemPipeline implements Pipeline { @Override public PCollection read(Source source) { + return read(source, null); + } + + @Override + public PCollection read(Source source, String named) { + String name = named == null ? source.toString() : named; if (source instanceof ReadableSource) { try { Iterable iterable = ((ReadableSource) source).read(conf); - return new MemCollection(iterable, source.getType(), source.toString()); + return new MemCollection(iterable, source.getType(), name); } catch (IOException e) { - LOG.error("Exception reading source: " + source.toString(), e); + LOG.error("Exception reading source: " + name, e); throw new IllegalStateException(e); } } - LOG.error("Source {} is not readable", source); - throw new IllegalStateException("Source " + source + " is not readable"); + LOG.error("Source {} is not readable", name); + throw new IllegalStateException("Source " + name + " is not readable"); } @Override public PTable read(TableSource source) { + return read(source, null); + } + + @Override + public PTable read(TableSource source, String named) { + String name = named == null ? source.toString() : named; if (source instanceof ReadableSource) { try { Iterable> iterable = ((ReadableSource>) source).read(conf); - return new MemTable(iterable, source.getTableType(), source.toString()); + return new MemTable(iterable, source.getTableType(), name); } catch (IOException e) { - LOG.error("Exception reading source: " + source, e); + LOG.error("Exception reading source: " + name, e); throw new IllegalStateException(e); } } - LOG.error("Source {} is not readable", source); - throw new IllegalStateException("Source " + source + " is not readable"); + LOG.error("Source {} is not readable", name); + throw new IllegalStateException("Source " + name + " is not readable"); } @Override @@ -332,6 +344,26 @@ public class MemPipeline implements Pipeline { } @Override + public PCollection create(Iterable contents, PType ptype) { + return create(contents, ptype, CreateOptions.none()); + } + + @Override + public PCollection create(Iterable iterable, PType ptype, CreateOptions options) { + return typedCollectionOf(ptype, iterable); + } + + @Override + public PTable create(Iterable> contents, PTableType ptype) { + return create(contents, ptype, CreateOptions.none()); + } + + @Override + public PTable create(Iterable> contents, PTableType ptype, CreateOptions options) { + return typedTableOf(ptype, contents); + } + + @Override public Output sequentialDo(PipelineCallable callable) { Output out = callable.generateOutput(this); try { http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java index ea189f8..79a427e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java @@ -28,8 +28,8 @@ import org.apache.crunch.io.ReadableSource; public class InputCollection extends BaseInputCollection implements MRCollection { - public InputCollection(Source source, MRPipeline pipeline, ParallelDoOptions doOpts) { - super(source, pipeline, doOpts); + public InputCollection(Source source, String name, MRPipeline pipeline, ParallelDoOptions doOpts) { + super(source, name, pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java index 3154190..380382c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -26,8 +26,8 @@ import org.apache.crunch.impl.mr.plan.DoNode; public class InputTable extends BaseInputTable implements MRCollection { - public InputTable(TableSource source, MRPipeline pipeline, ParallelDoOptions doOpts) { - super(source, pipeline, doOpts); + public InputTable(TableSource source, String name, MRPipeline pipeline, ParallelDoOptions doOpts) { + super(source, name, pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java index ede88f2..4e0ef7d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java @@ -52,17 +52,19 @@ public class MRCollectionFactory implements PCollectionFactory { @Override public BaseInputCollection createInputCollection( Source source, + String name, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - return new InputCollection(source, (MRPipeline) pipeline, doOpts); + return new InputCollection(source, name, (MRPipeline) pipeline, doOpts); } @Override public BaseInputTable createInputTable( TableSource source, + String name, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - return new InputTable(source, (MRPipeline) pipeline, doOpts); + return new InputTable(source, name, (MRPipeline) pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/PType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PType.java b/crunch-core/src/main/java/org/apache/crunch/types/PType.java index ebddf84..258ce79 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/PType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/PType.java @@ -17,12 +17,14 @@ */ package org.apache.crunch.types; +import java.io.IOException; import java.io.Serializable; import java.util.List; import org.apache.crunch.DoFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -80,6 +82,18 @@ public interface PType extends Serializable { ReadableSourceTarget getDefaultFileSource(Path path); /** + * Returns a {@code ReadableSource} that contains the data in the given {@code Iterable}. + * + * @param conf The Configuration to use + * @param path The path to write the data to + * @param contents The contents to write + * @param parallelism The desired parallelism + * @return A new instance of ReadableSource + */ + ReadableSource createSourceTarget(Configuration conf, Path path, Iterable contents, int parallelism) + throws IOException; + + /** * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple. */ List getSubTypes(); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index 3df313f..c359afc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.types.avro; +import java.io.IOException; import java.util.Collection; import org.apache.avro.mapred.AvroJob; @@ -27,11 +28,13 @@ import org.apache.crunch.GroupingOptions; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; import org.apache.crunch.fn.PairMapFn; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; /** @@ -112,4 +115,12 @@ class AvroGroupedTableType extends PGroupedTableType { } } + @Override + public ReadableSource>> createSourceTarget( + Configuration conf, + Path path, + Iterable>> contents, + int parallelism) throws IOException { + throw new UnsupportedOperationException("GroupedTableTypes do not support creating ReadableSources"); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java index 4e35b91..9dbf6b0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -17,22 +17,32 @@ */ package org.apache.crunch.types.avro; +import java.io.IOException; import java.util.List; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificRecord; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.MapFn; import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.avro.AvroFileSource; import org.apache.crunch.io.avro.AvroFileSourceTarget; import org.apache.crunch.types.Converter; import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -50,6 +60,7 @@ public class AvroType implements PType { GENERIC } + private static final Logger LOG = LoggerFactory.getLogger(AvroType.class); private static final Converter AVRO_CONVERTER = new AvroKeyConverter(); private final Class typeClass; @@ -197,6 +208,41 @@ public class AvroType implements PType { } @Override + public ReadableSource createSourceTarget(Configuration conf, Path path, Iterable contents, int parallelism) + throws IOException { + FileSystem fs = FileSystem.get(conf); + baseOutputMapFn.setConfiguration(conf); + baseOutputMapFn.initialize(); + fs.mkdirs(path); + List streams = Lists.newArrayListWithExpectedSize(parallelism); + List writers = Lists.newArrayListWithExpectedSize(parallelism); + for (int i = 0; i < parallelism; i++) { + Path out = new Path(path, "out" + i); + FSDataOutputStream stream = fs.create(out); + DatumWriter datumWriter = Avros.newWriter(this); + DataFileWriter writer = new DataFileWriter(datumWriter); + writer.create(getSchema(), stream); + + streams.add(stream); + writers.add(writer); + } + int target = 0; + for (T value : contents) { + writers.get(target).append(baseOutputMapFn.map(value)); + target = (target + 1) % parallelism; + } + for (DataFileWriter writer : writers) { + writer.close(); + } + for (FSDataOutputStream stream : streams) { + stream.close(); + } + ReadableSource ret = new AvroFileSource(path, this); + ret.inputConf(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + return ret; + } + + @Override public void initialize(Configuration conf) { baseInputMapFn.setConfiguration(conf); baseInputMapFn.initialize(); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java index 3167591..c25345b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java @@ -20,12 +20,16 @@ package org.apache.crunch.types.writable; import org.apache.crunch.GroupingOptions; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PGroupedTableType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import java.io.IOException; + class WritableGroupedTableType extends PGroupedTableType { private final MapFn inputFn; @@ -73,6 +77,15 @@ class WritableGroupedTableType extends PGroupedTableType { } @Override + public ReadableSource>> createSourceTarget( + Configuration conf, + Path path, + Iterable>> contents, + int parallelism) throws IOException { + throw new UnsupportedOperationException("GroupedTableTypes do not support creating ReadableSources"); + } + + @Override public void configureShuffle(Job job, GroupingOptions options) { if (options != null) { options.configure(job); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java index 93e0fd6..d0cc915 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java @@ -17,13 +17,19 @@ */ package org.apache.crunch.types.writable; +import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; import org.apache.crunch.fn.PairMapFn; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.seq.SeqFileSource; +import org.apache.crunch.io.seq.SeqFileTableSource; import org.apache.crunch.io.seq.SeqFileTableSourceTarget; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.Converter; @@ -32,7 +38,10 @@ import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import com.google.common.collect.ImmutableList; @@ -103,6 +112,33 @@ class WritableTableType implements PTableType { } @Override + public ReadableSource> createSourceTarget( + Configuration conf, Path path, Iterable> contents, int parallelism) throws IOException { + FileSystem fs = FileSystem.get(conf); + outputFn.setConfiguration(conf); + outputFn.initialize(); + fs.mkdirs(path); + List writers = Lists.newArrayListWithExpectedSize(parallelism); + for (int i = 0; i < parallelism; i++) { + Path out = new Path(path, "out" + i); + writers.add(SequenceFile.createWriter(fs, conf, out, keyType.getSerializationClass(), + valueType.getSerializationClass())); + } + int target = 0; + for (Pair value : contents) { + Pair writablePair = (Pair) outputFn.map(value); + writers.get(target).append(writablePair.first(), writablePair.second()); + target = (target + 1) % parallelism; + } + for (SequenceFile.Writer writer : writers) { + writer.close(); + } + ReadableSource> ret = new SeqFileTableSource(path, this); + ret.inputConf(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + return ret; + } + + @Override public void initialize(Configuration conf) { keyType.initialize(conf); valueType.initialize(conf); http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java index ea3019b..2baf14b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java @@ -17,23 +17,40 @@ */ package org.apache.crunch.types.writable; +import java.io.IOException; import java.util.List; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.MapFn; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.seq.SeqFileSource; import org.apache.crunch.io.seq.SeqFileSourceTarget; +import org.apache.crunch.io.text.NLineFileSource; +import org.apache.crunch.io.text.TextFileSource; import org.apache.crunch.types.Converter; import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WritableType implements PType { + private static final Logger LOG = LoggerFactory.getLogger(WritableType.class); + private final Class typeClass; private final Class writableClass; private final Converter converter; @@ -112,6 +129,45 @@ public class WritableType implements PType { } @Override + public ReadableSource createSourceTarget(Configuration conf, Path path, Iterable contents, int parallelism) + throws IOException { + FileSystem fs = FileSystem.get(conf); + outputFn.setConfiguration(conf); + outputFn.initialize(); + if (Text.class.equals(writableClass) && parallelism > 1) { + FSDataOutputStream out = fs.create(path); + byte[] newLine = "\r\n".getBytes(Charsets.UTF_8); + double contentSize = 0; + for (T value : contents) { + Text txt = (Text) outputFn.map(value); + out.write(txt.toString().getBytes(Charsets.UTF_8)); + out.write(newLine); + contentSize++; + } + out.close(); + return new NLineFileSource(path, this, (int) Math.ceil(contentSize / parallelism)); + } else { // Use sequence files + fs.mkdirs(path); + List writers = Lists.newArrayListWithExpectedSize(parallelism); + for (int i = 0; i < parallelism; i++) { + Path out = new Path(path, "out" + i); + writers.add(SequenceFile.createWriter(fs, conf, out, NullWritable.class, writableClass)); + } + int target = 0; + for (T value : contents) { + writers.get(target).append(NullWritable.get(), outputFn.map(value)); + target = (target + 1) % parallelism; + } + for (SequenceFile.Writer writer : writers) { + writer.close(); + } + ReadableSource ret = new SeqFileSource(path, this); + ret.inputConf(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + return ret; + } + } + + @Override public boolean equals(Object obj) { if (obj == null || !(obj instanceof WritableType)) { return false; http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java b/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java new file mode 100644 index 0000000..0fca3ec --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/CreateIT.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class CreateIT { + + @Rule + public TemporaryPath tmpDir = new TemporaryPath(); + + @Test + public void testMRWritable() throws Exception { + run(new MRPipeline(CreateIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); + } + + @Test + public void testMRAvro() throws Exception { + run(new MRPipeline(CreateIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance()); + } + + @Test + public void testMemWritable() throws Exception { + run(MemPipeline.getInstance(), WritableTypeFamily.getInstance()); + } + + @Test + public void testMemAvro() throws Exception { + run(MemPipeline.getInstance(), AvroTypeFamily.getInstance()); + } + + @Test + public void testSparkWritable() throws Exception { + run(new SparkPipeline("local", "CreateIT", CreateIT.class, tmpDir.getDefaultConfiguration()), + WritableTypeFamily.getInstance()); + } + + @Test + public void testSparkAvro() throws Exception { + run(new SparkPipeline("local", "CreateIT", CreateIT.class, tmpDir.getDefaultConfiguration()), + AvroTypeFamily.getInstance()); + } + + public static void run(Pipeline p, PTypeFamily ptf) { + PTable in = p.create( + ImmutableList.of( + Pair.of("a", 2L), Pair.of("b", 3L), Pair.of("c", 5L), + Pair.of("a", 1L), Pair.of("b", 8L), Pair.of("c", 7L)), + ptf.tableOf(ptf.strings(), ptf.longs()), + CreateOptions.nameAndParallelism("in", 2)); + PTable out = in.groupByKey().combineValues(Aggregators.SUM_LONGS()); + Map values = out.materializeToMap(); + assertEquals(3, values.size()); + assertEquals(3L, values.get("a").longValue()); + assertEquals(11L, values.get("b").longValue()); + assertEquals(12L, values.get("c").longValue()); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java index 47b9300..91adb8a 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java @@ -17,6 +17,7 @@ */ package org.apache.crunch; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.crunch.impl.spark.SparkPipeline; @@ -39,6 +40,19 @@ import static org.junit.Assert.assertEquals; public class SparkPageRankIT { + private static List URLS = ImmutableList.of( + "www.A.com www.B.com", + "www.A.com www.C.com", + "www.A.com www.D.com", + "www.A.com www.E.com", + "www.B.com www.D.com", + "www.B.com www.E.com", + "www.C.com www.D.com", + "www.D.com www.B.com", + "www.E.com www.A.com", + "www.F.com www.B.com", + "www.F.com www.C.com"); + public static class PageRankData { public float score; public float lastScore; @@ -80,8 +94,7 @@ public class SparkPageRankIT { public void testAvroReflects() throws Exception { PTypeFamily tf = AvroTypeFamily.getInstance(); PType prType = Avros.reflects(PageRankData.class); - String urlInput = tmpDir.copyResourceFileName("urls.txt"); - run(pipeline, urlInput, prType, tf); + run(pipeline, prType, tf); pipeline.done(); } @@ -89,8 +102,7 @@ public class SparkPageRankIT { public void testWritablesJSON() throws Exception { PTypeFamily tf = WritableTypeFamily.getInstance(); PType prType = PTypes.jsonString(PageRankData.class, tf); - String urlInput = tmpDir.copyResourceFileName("urls.txt"); - run(pipeline, urlInput, prType, tf); + run(pipeline, prType, tf); pipeline.done(); } @@ -121,13 +133,13 @@ public class SparkPageRankIT { }, input.getValueType()); } - public static void run(Pipeline pipeline, String urlInput, + public static void run(Pipeline pipeline, PType prType, PTypeFamily ptf) throws Exception { - PTable scores = pipeline.readTextFile(urlInput) + PTable scores = pipeline.create(URLS, ptf.strings()) .parallelDo(new MapFn>() { @Override public Pair map(String input) { - String[] urls = input.split("\\t"); + String[] urls = input.split("\\s+"); return Pair.of(urls[0], urls[1]); } }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey() @@ -150,5 +162,6 @@ public class SparkPageRankIT { }, ptf.floats())).getValue(); } assertEquals(0.0048, delta, 0.001); + pipeline.done(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java index 3367d3c..8108de8 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java @@ -20,12 +20,16 @@ package org.apache.crunch.impl.spark; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.crunch.CachingOptions; +import org.apache.crunch.CreateOptions; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; +import org.apache.crunch.Pair; import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.spark.collect.CreatedCollection; +import org.apache.crunch.impl.spark.collect.CreatedTable; import org.apache.crunch.impl.spark.collect.EmptyPCollection; import org.apache.crunch.impl.spark.collect.EmptyPTable; import org.apache.crunch.impl.spark.collect.SparkCollectFactory; @@ -92,6 +96,16 @@ public class SparkPipeline extends DistributedPipeline { } @Override + public PCollection create(Iterable contents, PType ptype, CreateOptions options) { + return new CreatedCollection(this, contents, ptype, options); + } + + @Override + public PTable create(Iterable> contents, PTableType ptype, CreateOptions options) { + return new CreatedTable(this, contents, ptype, options); + } + + @Override public void cache(PCollection pcollection, CachingOptions options) { cachedCollections.put(pcollection, toStorageLevel(options)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java new file mode 100644 index 0000000..8c473ad --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedCollection.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.collect; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.crunch.CreateOptions; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.MapFn; +import org.apache.crunch.ReadableData; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.SparkCollection; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.crunch.impl.spark.serde.SerDe; +import org.apache.crunch.impl.spark.serde.SerDeFactory; +import org.apache.crunch.types.PType; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.storage.StorageLevel; + +import java.io.IOException; +import java.util.List; + +/** + * Represents a Spark-based PCollection that was created from a Java {@code Iterable} of + * values. + */ +public class CreatedCollection extends PCollectionImpl implements SparkCollection { + + private final Iterable contents; + private final PType ptype; + private final int parallelism; + private JavaRDD rdd; + + public CreatedCollection(SparkPipeline p, Iterable contents, PType ptype, CreateOptions options) { + super(options.getName(), p); + this.contents = contents; + this.ptype = ptype; + this.parallelism = options.getParallelism(); + } + + @Override + protected void acceptInternal(Visitor visitor) { + // No-op + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + protected ReadableData getReadableDataInternal() { + try { + return ptype.createSourceTarget(getPipeline().getConfiguration(), + getPipeline().createTempPath(), contents, parallelism).asReadable(); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + protected long getSizeInternal() { + return Iterables.size(contents); + } + + @Override + public long getLastModifiedAt() { + return -1; + } + + @Override + public PType getPType() { + return ptype; + } + + @Override + public JavaRDDLike getJavaRDDLike(SparkRuntime runtime) { + if (!runtime.isValid(rdd)) { + rdd = getJavaRDDLikeInternal(runtime); + rdd.rdd().setName(getName()); + StorageLevel sl = runtime.getStorageLevel(this); + if (sl != null) { + rdd.rdd().persist(sl); + } + } + return rdd; + } + + private JavaRDD getJavaRDDLikeInternal(SparkRuntime runtime) { + SerDe serde = SerDeFactory.create(ptype, runtime.getConfiguration()); + ptype.initialize(runtime.getConfiguration()); + List res = Lists.newLinkedList(); + try { + for (T value : contents) { + res.add(serde.toBytes(ptype.getOutputMapFn().map(value))); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + return runtime.getSparkContext() + .parallelize(res, parallelism) + .map(new MapInputFn(serde, ptype.getInputMapFn(), runtime.getRuntimeContext())); + } + + static class MapInputFn implements Function { + + private final SerDe serde; + private final MapFn fn; + private final SparkRuntimeContext context; + private boolean initialized; + + public MapInputFn(SerDe serde, MapFn fn, SparkRuntimeContext context) { + this.serde = serde; + this.fn = fn; + this.context = context; + this.initialized = false; + } + + @Override + public T call(ByteArray byteArray) throws Exception { + if (!initialized) { + context.initialize(fn, -1); + initialized = true; + } + return fn.map(serde.fromBytes(byteArray.value)); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java new file mode 100644 index 0000000..fb18787 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/CreatedTable.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.collect; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.crunch.CreateOptions; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.dist.collect.PTableBase; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.SparkCollection; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.crunch.impl.spark.serde.SerDe; +import org.apache.crunch.impl.spark.serde.SerDeFactory; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import scala.Tuple2; + +import java.io.IOException; +import java.util.List; + +/** + * Represents a Spark-based PTable that was created from a Java {@code Iterable} of + * key-value pairs. + */ +public class CreatedTable extends PTableBase implements SparkCollection { + + private final Iterable> contents; + private final PTableType ptype; + private final int parallelism; + private JavaPairRDD rdd; + + public CreatedTable( + SparkPipeline pipeline, + Iterable> contents, + PTableType ptype, + CreateOptions options) { + super(options.getName(), pipeline); + this.contents = contents; + this.ptype = ptype; + this.parallelism = options.getParallelism(); + } + + @Override + protected void acceptInternal(Visitor visitor) { + // No-op + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + protected ReadableData> getReadableDataInternal() { + try { + return ptype.createSourceTarget(pipeline.getConfiguration(), + pipeline.createTempPath(), contents, parallelism).asReadable(); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + protected long getSizeInternal() { + return Iterables.size(contents); + } + + @Override + public long getLastModifiedAt() { + return -1; + } + + @Override + public PTableType getPTableType() { + return ptype; + } + + @Override + public PType> getPType() { + return ptype; + } + + @Override + public JavaRDDLike getJavaRDDLike(SparkRuntime runtime) { + if (!runtime.isValid(rdd)) { + rdd = getJavaRDDLikeInternal(runtime); + rdd.rdd().setName(getName()); + StorageLevel sl = runtime.getStorageLevel(this); + if (sl != null) { + rdd.rdd().persist(sl); + } + } + return rdd; + } + + private JavaPairRDD getJavaRDDLikeInternal(SparkRuntime runtime) { + ptype.initialize(runtime.getConfiguration()); + PType keyType = ptype.getKeyType(); + PType valueType = ptype.getValueType(); + SerDe keySerde = SerDeFactory.create(keyType, runtime.getConfiguration()); + SerDe valueSerde = SerDeFactory.create(valueType, runtime.getConfiguration()); + List> res = Lists.newLinkedList(); + try { + for (Pair p : contents) { + ByteArray key = keySerde.toBytes(keyType.getOutputMapFn().map(p.first())); + ByteArray value = valueSerde.toBytes(valueType.getOutputMapFn().map(p.second())); + res.add(new Tuple2(key, value)); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + return runtime.getSparkContext() + .parallelizePairs(res, parallelism) + .mapToPair(new MapPairInputFn( + keySerde, valueSerde, keyType.getInputMapFn(), valueType.getInputMapFn(), runtime.getRuntimeContext())); + } + + static class MapPairInputFn implements PairFunction, K, V> { + + private final SerDe keySerde; + private final SerDe valueSerde; + private final MapFn keyFn; + private final MapFn valueFn; + private final SparkRuntimeContext context; + private boolean initialized; + + public MapPairInputFn( + SerDe keySerde, + SerDe valueSerde, + MapFn keyFn, + MapFn valueFn, + SparkRuntimeContext context) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.keyFn = keyFn; + this.valueFn = valueFn; + this.context = context; + this.initialized = false; + } + + @Override + public Tuple2 call(Tuple2 in) throws Exception { + if (!initialized) { + context.initialize(keyFn, -1); + context.initialize(valueFn, -1); + initialized = true; + } + return new Tuple2( + keyFn.map(keySerde.fromBytes(in._1().value)), + valueFn.map(valueSerde.fromBytes(in._2().value))); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java index 8273236..51e1eac 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java @@ -38,8 +38,8 @@ import java.io.IOException; public class InputCollection extends BaseInputCollection implements SparkCollection { - InputCollection(Source source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - super(source, pipeline, doOpts); + InputCollection(Source source, String named, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(source, named, pipeline, doOpts); } public JavaRDDLike getJavaRDDLike(SparkRuntime runtime) { @@ -53,7 +53,7 @@ public class InputCollection extends BaseInputCollection implements SparkC CrunchInputFormat.class, converter.getKeyClass(), converter.getValueClass()); - input.rdd().setName(source.toString()); + input.rdd().setName(getName()); MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance(); return input .map(new InputConverterFunction(source.getConverter())) http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java index a0f7189..72963ec 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java @@ -37,8 +37,8 @@ import java.io.IOException; public class InputTable extends BaseInputTable implements SparkCollection { - public InputTable(TableSource source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - super(source, pipeline, doOpts); + public InputTable(TableSource source, String named, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(source, named, pipeline, doOpts); } @Override @@ -52,7 +52,7 @@ public class InputTable extends BaseInputTable implements SparkColle CrunchInputFormat.class, converter.getKeyClass(), converter.getValueClass()); - input.rdd().setName(source.toString()); + input.rdd().setName(getName()); MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance(); return input .map(new InputConverterFunction(source.getConverter())) http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java index 1421945..37f3254 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java @@ -45,17 +45,19 @@ public class SparkCollectFactory implements PCollectionFactory { @Override public BaseInputCollection createInputCollection( Source source, + String named, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - return new InputCollection(source, pipeline, doOpts); + return new InputCollection(source, named, pipeline, doOpts); } @Override public BaseInputTable createInputTable( TableSource source, + String named, DistributedPipeline pipeline, ParallelDoOptions doOpts) { - return new InputTable(source, pipeline, doOpts); + return new InputTable(source, named, pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java index cef60e3..5b21851 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java @@ -18,18 +18,12 @@ package org.apache.crunch.impl.spark.serde; import com.google.common.base.Function; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; import org.apache.crunch.impl.spark.ByteArray; import org.apache.crunch.impl.spark.ByteArrayHelper; import org.apache.crunch.types.avro.AvroMode; http://git-wip-us.apache.org/repos/asf/crunch/blob/006cd72a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java new file mode 100644 index 0000000..afab611 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDeFactory.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.serde; + +import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroMode; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.writable.WritableType; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +public class SerDeFactory { + public static SerDe create(PType ptype, Configuration conf) { + if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) { + return new WritableSerDe(((WritableType) ptype).getSerializationClass()); + } else { + AvroType at = (AvroType) ptype; + Map props = AvroMode.fromType(at).withFactoryFromConfiguration(conf).getModeProperties(); + return new AvroSerDe(at, props); + } + } +}