Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBFDFEDAA for ; Sun, 3 Feb 2013 16:52:08 +0000 (UTC) Received: (qmail 61773 invoked by uid 500); 3 Feb 2013 16:52:08 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 61737 invoked by uid 500); 3 Feb 2013 16:52:08 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 61729 invoked by uid 99); 3 Feb 2013 16:52:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Feb 2013 16:52:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4363882975A; Sun, 3 Feb 2013 16:52:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-155: Don't trigger a MapReduce job to materialize an InputCollection. Message-Id: <20130203165208.4363882975A@tyr.zones.apache.org> Date: Sun, 3 Feb 2013 16:52:08 +0000 (UTC) Updated Branches: refs/heads/master 70c4edd0b -> 41f01c037 CRUNCH-155: Don't trigger a MapReduce job to materialize an InputCollection. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/41f01c03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/41f01c03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/41f01c03 Branch: refs/heads/master Commit: 41f01c037007fb54091a69464d81c3269b269710 Parents: 70c4edd Author: Josh Wills Authored: Thu Jan 31 21:11:03 2013 -0800 Committer: Josh Wills Committed: Sun Feb 3 08:49:00 2013 -0800 ---------------------------------------------------------------------- .../src/it/java/org/apache/crunch/UnionGbkIT.java | 117 +++++++++++++++ .../java/org/apache/crunch/impl/mr/MRPipeline.java | 62 ++++++--- .../apache/crunch/impl/mr/collect/InputTable.java | 4 + .../crunch/impl/mr/collect/PCollectionImpl.java | 1 + .../org/apache/crunch/io/impl/FileSourceImpl.java | 4 + .../org/apache/crunch/lib/join/MapsideJoin.java | 10 +- .../crunch/materialize/MaterializableIterable.java | 28 +++-- 7 files changed, 193 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java new file mode 100644 index 0000000..3937fe8 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertNotNull; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class UnionGbkIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + MRPipeline pipeline; + + public static class FirstLetterKeyFn extends DoFn> { + @Override + public void process(String input, Emitter> emitter) { + if (input.length() > 0) { + emitter.emit(Pair.of(input.substring(0, 1), input)); + } + } + } + + public static class ConcatGroupFn extends DoFn>, String> { + @Override + public void process(Pair> input, Emitter emitter) { + StringBuilder sb = new StringBuilder(); + for (String str : input.second()) { + sb.append(str); + } + emitter.emit(sb.toString()); + } + } + + @Before + public void setUp() { + pipeline = new MRPipeline(UnionGbkIT.class, tmpDir.getDefaultConfiguration()); + } + + @After + public void tearDown() { + pipeline.done(); + } + + @Test + public void tableOfUnionGbk() throws Exception { + PCollection words = pipeline.readTextFile( + tmpDir.copyResourceFileName("shakes.txt")); + PCollection lorum = pipeline.readTextFile( + tmpDir.copyResourceFileName("maugham.txt")); + lorum.materialize(); + + @SuppressWarnings("unchecked") + PCollection union = words.union(lorum); + + PGroupedTable groupedByFirstLetter = + union.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())) + .groupByKey(); + PCollection concatted = groupedByFirstLetter + .parallelDo("concat", new ConcatGroupFn(), Avros.strings()); + + assertNotNull(concatted.materialize().iterator()); + } + + @Test + public void unionOfTablesGbk() throws Exception { + PCollection words = pipeline.readTextFile( + tmpDir.copyResourceFileName("shakes.txt")); + PCollection lorum = pipeline.readTextFile( + tmpDir.copyResourceFileName("maugham.txt")); + lorum.materialize(); + + PTable wordsByFirstLetter = + words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())); + PTable lorumByFirstLetter = + lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())); + + @SuppressWarnings("unchecked") + PTable union = wordsByFirstLetter.union(lorumByFirstLetter); + + PGroupedTable groupedByFirstLetter = union.groupByKey(); + + PCollection concatted = groupedByFirstLetter.parallelDo("concat", + new ConcatGroupFn(), Avros.strings()); + + assertNotNull(concatted.materialize().iterator()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 6ef7491..9c98937 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -45,6 +45,7 @@ import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.impl.mr.plan.MSCRPlanner; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.From; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.io.To; import org.apache.crunch.materialize.MaterializableIterable; @@ -156,8 +157,10 @@ public class MRPipeline implements Pipeline { for (PCollectionImpl c : outputTargets.keySet()) { if (outputTargetsToMaterialize.containsKey(c)) { MaterializableIterable iter = outputTargetsToMaterialize.get(c); - iter.materialize(); - c.materializeAt(iter.getSourceTarget()); + if (iter.isSourceTarget()) { + iter.materialize(); + c.materializeAt((SourceTarget) iter.getSource()); + } outputTargetsToMaterialize.remove(c); } else { boolean materialized = false; @@ -225,9 +228,9 @@ public class MRPipeline implements Pipeline { public Iterable materialize(PCollection pcollection) { PCollectionImpl pcollectionImpl = toPcollectionImpl(pcollection); - ReadableSourceTarget srcTarget = getMaterializeSourceTarget(pcollectionImpl); + ReadableSource readableSrc = getMaterializeSourceTarget(pcollectionImpl); - MaterializableIterable c = new MaterializableIterable(this, srcTarget); + MaterializableIterable c = new MaterializableIterable(this, readableSrc); if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { outputTargetsToMaterialize.put(pcollectionImpl, c); } @@ -245,35 +248,56 @@ public class MRPipeline implements Pipeline { * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given * PCollection */ - public ReadableSourceTarget getMaterializeSourceTarget(PCollection pcollection) { + public ReadableSource getMaterializeSourceTarget(PCollection pcollection) { PCollectionImpl impl = toPcollectionImpl(pcollection); + + // First, check to see if this is a readable input collection. + if (impl instanceof InputCollection) { + InputCollection ic = (InputCollection) impl; + if (ic.getSource() instanceof ReadableSource) { + return (ReadableSource) ic.getSource(); + } else { + throw new IllegalArgumentException( + "Cannot materialize non-readable input collection: " + ic); + } + } else if (impl instanceof InputTable) { + InputTable it = (InputTable) impl; + if (it.getSource() instanceof ReadableSource) { + return (ReadableSource) it.getSource(); + } else { + throw new IllegalArgumentException( + "Cannot materialize non-readable input table: " + it); + } + } + + // Next, check to see if this pcollection has already been materialized. SourceTarget matTarget = impl.getMaterializedAt(); if (matTarget != null && matTarget instanceof ReadableSourceTarget) { return (ReadableSourceTarget) matTarget; } - + + // Check to see if we plan on materializing this collection on the + // next run. ReadableSourceTarget srcTarget = null; if (outputTargets.containsKey(pcollection)) { for (Target target : outputTargets.get(impl)) { if (target instanceof ReadableSourceTarget) { - srcTarget = (ReadableSourceTarget) target; - break; + return (ReadableSourceTarget) target; } } } - if (srcTarget == null) { - SourceTarget st = createIntermediateOutput(pcollection.getPType()); - if (!(st instanceof ReadableSourceTarget)) { - throw new IllegalArgumentException("The PType for the given PCollection is not readable" - + " and cannot be materialized"); - } else { - srcTarget = (ReadableSourceTarget) st; - addOutput(impl, srcTarget); - } + // If we're not planning on materializing it already, create a temporary + // output to hold the materialized records and return that. + SourceTarget st = createIntermediateOutput(pcollection.getPType()); + if (!(st instanceof ReadableSourceTarget)) { + throw new IllegalArgumentException("The PType for the given PCollection is not readable" + + " and cannot be materialized"); + } else { + srcTarget = (ReadableSourceTarget) st; + addOutput(impl, srcTarget); + return srcTarget; } - - return srcTarget; } /** http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java index 9f64803..71f11c5 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -40,6 +40,10 @@ public class InputTable extends PTableBase { this.asCollection = new InputCollection>(source, pipeline); } + public TableSource getSource() { + return source; + } + @Override protected long getSizeInternal() { return asCollection.getSizeInternal(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 8ad6692..296043f 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -38,6 +38,7 @@ import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; import org.apache.crunch.types.PTableType; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index 964c6a0..688c801 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -53,6 +53,10 @@ public class FileSourceImpl implements Source { this.inputBundle = inputBundle; } + public Path getPath() { + return path; + } + @Override public void configureSource(Job job, int inputId) throws IOException { if (inputId == -1) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java index 9b532c5..fa28155 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java @@ -25,6 +25,7 @@ import org.apache.crunch.Emitter; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.SourceTarget; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PType; @@ -72,12 +73,13 @@ public class MapsideJoin { if (iterable instanceof MaterializableIterable) { MaterializableIterable> mi = (MaterializableIterable>) iterable; MapsideJoinDoFn mapJoinDoFn = new MapsideJoinDoFn(mi.getPath().toString(), right.getPType()); - ParallelDoOptions options = ParallelDoOptions.builder() - .sourceTargets(mi.getSourceTarget()) - .build(); + ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder(); + if (mi.isSourceTarget()) { + optionsBuilder.sourceTargets((SourceTarget) mi.getSource()); + } return left.parallelDo("mapjoin", mapJoinDoFn, tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())), - options); + optionsBuilder.build()); } else { // in-memory pipeline return left.parallelDo(new InMemoryJoinFn(iterable), tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType()))); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java index 0ed29e3..2dcc64f 100644 --- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java +++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java @@ -24,8 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.Pipeline; +import org.apache.crunch.SourceTarget; import org.apache.crunch.io.PathTarget; -import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.hadoop.fs.Path; public class MaterializableIterable implements Iterable { @@ -33,22 +35,28 @@ public class MaterializableIterable implements Iterable { private static final Log LOG = LogFactory.getLog(MaterializableIterable.class); private final Pipeline pipeline; - private final ReadableSourceTarget sourceTarget; + private final ReadableSource source; private Iterable materialized; - public MaterializableIterable(Pipeline pipeline, ReadableSourceTarget source) { + public MaterializableIterable(Pipeline pipeline, ReadableSource source) { this.pipeline = pipeline; - this.sourceTarget = source; + this.source = source; this.materialized = null; } - public ReadableSourceTarget getSourceTarget() { - return sourceTarget; + public ReadableSource getSource() { + return source; } + public boolean isSourceTarget() { + return (source instanceof SourceTarget); + } + public Path getPath() { - if (sourceTarget instanceof PathTarget) { - return ((PathTarget) sourceTarget).getPath(); + if (source instanceof FileSourceImpl) { + return ((FileSourceImpl) source).getPath(); + } else if (source instanceof PathTarget) { + return ((PathTarget) source).getPath(); } return null; } @@ -64,9 +72,9 @@ public class MaterializableIterable implements Iterable { public void materialize() { try { - materialized = sourceTarget.read(pipeline.getConfiguration()); + materialized = source.read(pipeline.getConfiguration()); } catch (IOException e) { - LOG.error("Could not materialize: " + sourceTarget, e); + LOG.error("Could not materialize: " + source, e); throw new CrunchRuntimeException(e); } }