crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-163 Correct handling of same input in unions
Date Fri, 08 Feb 2013 16:15:47 GMT
Updated Branches:
  refs/heads/master 3f9d29b33 -> 170ba8eea


CRUNCH-163 Correct handling of same input in unions


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/170ba8ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/170ba8ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/170ba8ee

Branch: refs/heads/master
Commit: 170ba8eea530b965782fcbfefd1c5f3f236d1897
Parents: 3f9d29b
Author: Gabriel Reid <greid@apache.org>
Authored: Thu Feb 7 23:25:20 2013 +0100
Committer: Gabriel Reid <greid@apache.org>
Committed: Fri Feb 8 09:55:54 2013 +0100

----------------------------------------------------------------------
 .../org/apache/crunch/UnionFromSameSourceIT.java   |  132 +++++++++++++++
 .../crunch/impl/mr/collect/PCollectionImpl.java    |    4 +-
 .../java/org/apache/crunch/impl/mr/plan/Edge.java  |    9 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |    8 +-
 .../org/apache/crunch/impl/mr/plan/Vertex.java     |    4 +-
 5 files changed, 151 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java b/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
new file mode 100644
index 0000000..501a944
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Collection of tests re-using the same PCollection in various unions.
+ */
+public class UnionFromSameSourceIT {
+
+  private static final int NUM_ELEMENTS = 4;
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private Pipeline pipeline;
+  private PType<String> elementType = Writables.strings();
+  private PTableType<String, String> tableType = Writables.tableOf(Writables.strings(),
+    Writables.strings());
+
+  @Before
+  public void setUp() {
+    pipeline = new MRPipeline(UnionFromSameSourceIT.class, tmpDir.getDefaultConfiguration());
+  }
+
+  @Test
+  public void testUnion_SingleRead() throws IOException {
+    PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    PCollection<String> union = strings.union(strings.parallelDo(IdentityFn.<String>
getInstance(),
+      strings.getPType()));
+
+    assertEquals(NUM_ELEMENTS * 2, getCount(union));
+  }
+
+  @Test
+  public void testUnion_TwoReads() throws IOException {
+    PCollection<String> stringsA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    PCollection<String> stringsB = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+
+    PCollection<String> union = stringsA.union(stringsB);
+
+    assertEquals(NUM_ELEMENTS * 2, getCount(union));
+  }
+
+  @Test
+  public void testDoubleUnion_EndingWithGBK() throws IOException {
+    runDoubleUnionPipeline(true);
+  }
+
+  @Test
+  public void testDoubleUnion_EndingWithoutGBK() throws IOException {
+    runDoubleUnionPipeline(false);
+  }
+
+  private void runDoubleUnionPipeline(boolean endWithGBK) throws IOException {
+    PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    PTable<String, String> tableA = strings.parallelDo("to table A", new ToTableFn(),
tableType);
+    PTable<String, String> tableB = strings.parallelDo("to table B", new ToTableFn(),
tableType);
+
+    PGroupedTable<String, String> groupedTable = tableA.union(tableB).groupByKey();
+    PCollection<String> ungrouped = groupedTable.parallelDo("ungroup before union",
+      new FromGroupedTableFn(), elementType).union(
+      strings.parallelDo("fake id", IdentityFn.<String> getInstance(), elementType));
+
+    PTable<String, String> table = ungrouped.parallelDo("union back to table", new
ToTableFn(),
+      tableType);
+
+    if (endWithGBK) {
+      table = table.groupByKey().ungroup();
+    }
+
+    assertEquals(3 * NUM_ELEMENTS, getCount(table));
+  }
+
+  private int getCount(PCollection<?> pcollection) {
+    int cnt = 0;
+    for (Object v : pcollection.materialize()) {
+      cnt++;
+    }
+    return cnt;
+  }
+
+  private static class ToTableFn extends MapFn<String, Pair<String, String>>
{
+
+    @Override
+    public Pair<String, String> map(String input) {
+      return Pair.of(input, input);
+    }
+
+  }
+
+  private static class FromGroupedTableFn extends DoFn<Pair<String, Iterable<String>>,
String> {
+
+    @Override
+    public void process(Pair<String, Iterable<String>> input, Emitter<String>
emitter) {
+      for (String value : input.second()) {
+        emitter.emit(value);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 296043f..f48308a 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -36,9 +36,9 @@ import org.apache.crunch.Pipeline;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
 import org.apache.crunch.types.PTableType;
@@ -81,7 +81,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
     List<PCollectionImpl<S>> internal = Lists.newArrayList();
     internal.add(this);
     for (PCollection<S> collection : collections) {
-      internal.add((PCollectionImpl<S>) collection);
+      internal.add((PCollectionImpl<S>) collection.parallelDo(IdentityFn.<S>getInstance(),
collection.getPType()));
     }
     return new UnionCollection<S>(internal);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
index cf6fc37..1e59df0 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 
@@ -108,11 +110,16 @@ class Edge {
       return false;
     }
     Edge e = (Edge) other;
-    return head.equals(e.head) && tail.equals(e.tail);
+    return head.equals(e.head) && tail.equals(e.tail) && paths.equals(e.paths);
   }
   
   @Override
   public int hashCode() {
     return new HashCodeBuilder().append(head).append(tail).toHashCode();
   }
+  
+  @Override
+  public String toString() {
+    return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 3718ec2..472505b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -266,16 +266,19 @@ public class MSCRPlanner {
         assignment.put(v, prototype);
       }
     } else {
+      Set<Edge> usedEdges = Sets.newHashSet();
       for (Vertex g : gbks) {
         Set<NodePath> inputs = Sets.newHashSet();
         for (Edge e : g.getIncomingEdges()) {
           inputs.addAll(e.getNodePaths());
+          usedEdges.add(e);
         }
         JobPrototype prototype = JobPrototype.createMapReduceJob(
             (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
         assignment.put(g, prototype);
         for (Edge e : g.getIncomingEdges()) {
           assignment.put(e.getHead(), prototype);
+          usedEdges.add(e);
         }
         HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
         for (Edge e : g.getOutgoingEdges()) {
@@ -284,6 +287,7 @@ public class MSCRPlanner {
             outputPaths.putAll(t, e.getNodePaths());
           }
           assignment.put(output, prototype);
+          usedEdges.add(e);
         }
         prototype.addReducePaths(outputPaths);
       }
@@ -299,7 +303,7 @@ public class MSCRPlanner {
         boolean vertexHasUnassignedIncomingEdges = false;
         if (v.isOutput()) {
           for (Edge e : v.getIncomingEdges()) {
-            if (!assignment.containsKey(e.getHead())) {
+            if (!usedEdges.contains(e)) {
               vertexHasUnassignedIncomingEdges = true;
             }
           }
@@ -308,7 +312,7 @@ public class MSCRPlanner {
         if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v)))
{
           orphans.add(v);
           for (Edge e : v.getIncomingEdges()) {
-            if (vertexHasUnassignedIncomingEdges && assignment.containsKey(e.getHead()))
{
+            if (vertexHasUnassignedIncomingEdges && usedEdges.contains(e)) {
               // We've already dealt with this incoming edge
               continue;
             }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
index 99fc8ba..f4aa668 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
@@ -119,6 +120,7 @@ class Vertex {
 
   @Override
   public String toString() {
-    return ReflectionToStringBuilder.toStringExclude(this, Lists.newArrayList("outgoing",
"incoming"));
+    return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).setExcludeFieldNames(
+        new String[] { "outgoing", "incoming" }).toString();
   }
 }


Mime
View raw message