crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-155: Don't trigger a MapReduce job to materialize an InputCollection.
Date Sun, 03 Feb 2013 16:52:08 GMT
Updated Branches:
  refs/heads/master 70c4edd0b -> 41f01c037


CRUNCH-155: Don't trigger a MapReduce job to materialize an InputCollection.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/41f01c03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/41f01c03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/41f01c03

Branch: refs/heads/master
Commit: 41f01c037007fb54091a69464d81c3269b269710
Parents: 70c4edd
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jan 31 21:11:03 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Feb 3 08:49:00 2013 -0800

----------------------------------------------------------------------
 .../src/it/java/org/apache/crunch/UnionGbkIT.java  |  117 +++++++++++++++
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |   62 ++++++---
 .../apache/crunch/impl/mr/collect/InputTable.java  |    4 +
 .../crunch/impl/mr/collect/PCollectionImpl.java    |    1 +
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |    4 +
 .../org/apache/crunch/lib/join/MapsideJoin.java    |   10 +-
 .../crunch/materialize/MaterializableIterable.java |   28 +++--
 7 files changed, 193 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java
new file mode 100644
index 0000000..3937fe8
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class UnionGbkIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  MRPipeline pipeline;
+
+  public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>>
{
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter)
{
+      if (input.length() > 0) {
+        emitter.emit(Pair.of(input.substring(0, 1), input));
+      }
+    }
+  }
+
+  public static class ConcatGroupFn extends DoFn<Pair<String, Iterable<String>>,
String> {
+    @Override
+    public void process(Pair<String, Iterable<String>> input, Emitter<String>
emitter) {
+      StringBuilder sb = new StringBuilder();
+      for (String str : input.second()) {
+        sb.append(str);
+      }
+      emitter.emit(sb.toString());
+    }
+  }
+  
+  @Before
+  public void setUp() {
+    pipeline = new MRPipeline(UnionGbkIT.class, tmpDir.getDefaultConfiguration());
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void tableOfUnionGbk() throws Exception {
+    PCollection<String> words = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("shakes.txt"));
+    PCollection<String> lorum = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("maugham.txt"));
+    lorum.materialize();
+
+    @SuppressWarnings("unchecked")
+    PCollection<String> union = words.union(lorum);
+
+    PGroupedTable<String, String> groupedByFirstLetter =
+        union.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
+            Avros.tableOf(Avros.strings(), Avros.strings()))
+        .groupByKey();
+    PCollection<String> concatted = groupedByFirstLetter
+        .parallelDo("concat", new ConcatGroupFn(), Avros.strings());
+
+    assertNotNull(concatted.materialize().iterator());
+  }
+
+  @Test
+  public void unionOfTablesGbk() throws Exception {
+    PCollection<String> words = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("shakes.txt"));
+    PCollection<String> lorum = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("maugham.txt"));
+    lorum.materialize();
+
+    PTable<String, String> wordsByFirstLetter =
+        words.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
+            Avros.tableOf(Avros.strings(), Avros.strings()));
+    PTable<String, String> lorumByFirstLetter =
+        lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
+            Avros.tableOf(Avros.strings(), Avros.strings()));
+
+    @SuppressWarnings("unchecked")
+    PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter);
+
+    PGroupedTable<String, String> groupedByFirstLetter = union.groupByKey();
+
+    PCollection<String> concatted = groupedByFirstLetter.parallelDo("concat",
+        new ConcatGroupFn(), Avros.strings());
+
+    assertNotNull(concatted.materialize().iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 6ef7491..9c98937 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -45,6 +45,7 @@ import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.impl.mr.plan.MSCRPlanner;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.From;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.To;
 import org.apache.crunch.materialize.MaterializableIterable;
@@ -156,8 +157,10 @@ public class MRPipeline implements Pipeline {
     for (PCollectionImpl<?> c : outputTargets.keySet()) {
       if (outputTargetsToMaterialize.containsKey(c)) {
         MaterializableIterable iter = outputTargetsToMaterialize.get(c);
-        iter.materialize();
-        c.materializeAt(iter.getSourceTarget());
+        if (iter.isSourceTarget()) {
+          iter.materialize();
+          c.materializeAt((SourceTarget) iter.getSource());
+        }
         outputTargetsToMaterialize.remove(c);
       } else {
         boolean materialized = false;
@@ -225,9 +228,9 @@ public class MRPipeline implements Pipeline {
   public <T> Iterable<T> materialize(PCollection<T> pcollection) {
 
     PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
-    ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl);
+    ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
 
-    MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
+    MaterializableIterable<T> c = new MaterializableIterable<T>(this, readableSrc);
     if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
       outputTargetsToMaterialize.put(pcollectionImpl, c);
     }
@@ -245,35 +248,56 @@ public class MRPipeline implements Pipeline {
    * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the
given
    *           PCollection
    */
-  public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T>
pcollection) {
+  public <T> ReadableSource<T> getMaterializeSourceTarget(PCollection<T>
pcollection) {
     PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
+
+    // First, check to see if this is a readable input collection.
+    if (impl instanceof InputCollection) {
+      InputCollection<T> ic = (InputCollection<T>) impl;
+      if (ic.getSource() instanceof ReadableSource) {
+        return (ReadableSource) ic.getSource();
+      } else {
+        throw new IllegalArgumentException(
+            "Cannot materialize non-readable input collection: " + ic);
+      }
+    } else if (impl instanceof InputTable) {
+      InputTable it = (InputTable) impl;
+      if (it.getSource() instanceof ReadableSource) {
+        return (ReadableSource) it.getSource();
+      } else {
+        throw new IllegalArgumentException(
+            "Cannot materialize non-readable input table: " + it);
+      }
+    }
+
+    // Next, check to see if this pcollection has already been materialized.
     SourceTarget<T> matTarget = impl.getMaterializedAt();
     if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
       return (ReadableSourceTarget<T>) matTarget;
     }
-
+    
+    // Check to see if we plan on materializing this collection on the
+    // next run.
     ReadableSourceTarget<T> srcTarget = null;
     if (outputTargets.containsKey(pcollection)) {
       for (Target target : outputTargets.get(impl)) {
         if (target instanceof ReadableSourceTarget) {
-          srcTarget = (ReadableSourceTarget<T>) target;
-          break;
+          return (ReadableSourceTarget<T>) target;
         }
       }
     }
 
-    if (srcTarget == null) {
-      SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
-      if (!(st instanceof ReadableSourceTarget)) {
-        throw new IllegalArgumentException("The PType for the given PCollection is not readable"
-            + " and cannot be materialized");
-      } else {
-        srcTarget = (ReadableSourceTarget<T>) st;
-        addOutput(impl, srcTarget);
-      }
+    // If we're not planning on materializing it already, create a temporary
+    // output to hold the materialized records and return that.
+    SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
+    if (!(st instanceof ReadableSourceTarget)) {
+      throw new IllegalArgumentException("The PType for the given PCollection is not readable"
+          + " and cannot be materialized");
+    } else {
+      srcTarget = (ReadableSourceTarget<T>) st;
+      addOutput(impl, srcTarget);
+      return srcTarget;
     }
-
-    return srcTarget;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index 9f64803..71f11c5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -40,6 +40,10 @@ public class InputTable<K, V> extends PTableBase<K, V> {
     this.asCollection = new InputCollection<Pair<K, V>>(source, pipeline);
   }
 
+  public TableSource<K, V> getSource() {
+    return source;
+  }
+  
   @Override
   protected long getSizeInternal() {
     return asCollection.getSizeInternal();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 8ad6692..296043f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -38,6 +38,7 @@ import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
 import org.apache.crunch.types.PTableType;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 964c6a0..688c801 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -53,6 +53,10 @@ public class FileSourceImpl<T> implements Source<T> {
     this.inputBundle = inputBundle;
   }
 
+  public Path getPath() {
+    return path;
+  }
+  
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
     if (inputId == -1) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
index 9b532c5..fa28155 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -25,6 +25,7 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PType;
@@ -72,12 +73,13 @@ public class MapsideJoin {
     if (iterable instanceof MaterializableIterable) {
       MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K,
V>>) iterable;
       MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
right.getPType());
-      ParallelDoOptions options = ParallelDoOptions.builder()
-          .sourceTargets(mi.getSourceTarget())
-          .build();
+      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
+      if (mi.isSourceTarget()) {
+        optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
+      }
       return left.parallelDo("mapjoin", mapJoinDoFn,
           tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
-          options);
+          optionsBuilder.build());
     } else { // in-memory pipeline
       return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
           tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index 0ed29e3..2dcc64f 100644
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.hadoop.fs.Path;
 
 public class MaterializableIterable<E> implements Iterable<E> {
@@ -33,22 +35,28 @@ public class MaterializableIterable<E> implements Iterable<E>
{
   private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
 
   private final Pipeline pipeline;
-  private final ReadableSourceTarget<E> sourceTarget;
+  private final ReadableSource<E> source;
   private Iterable<E> materialized;
 
-  public MaterializableIterable(Pipeline pipeline, ReadableSourceTarget<E> source)
{
+  public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
     this.pipeline = pipeline;
-    this.sourceTarget = source;
+    this.source = source;
     this.materialized = null;
   }
 
-  public ReadableSourceTarget<E> getSourceTarget() {
-    return sourceTarget;
+  public ReadableSource<E> getSource() {
+    return source;
   }
 
+  public boolean isSourceTarget() {
+    return (source instanceof SourceTarget);
+  }
+  
   public Path getPath() {
-    if (sourceTarget instanceof PathTarget) {
-      return ((PathTarget) sourceTarget).getPath();
+    if (source instanceof FileSourceImpl) {
+      return ((FileSourceImpl) source).getPath();
+    } else if (source instanceof PathTarget) {
+      return ((PathTarget) source).getPath();
     }
     return null;
   }
@@ -64,9 +72,9 @@ public class MaterializableIterable<E> implements Iterable<E>
{
 
   public void materialize() {
     try {
-      materialized = sourceTarget.read(pipeline.getConfiguration());
+      materialized = source.read(pipeline.getConfiguration());
     } catch (IOException e) {
-      LOG.error("Could not materialize: " + sourceTarget, e);
+      LOG.error("Could not materialize: " + source, e);
       throw new CrunchRuntimeException(e);
     }
   }


Mime
View raw message