crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-379: Fix unions of PTables and PCollections in crunch-spark
Date Tue, 22 Apr 2014 02:23:23 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 617105b3d -> a96e30892


CRUNCH-379: Fix unions of PTables and PCollections in crunch-spark


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a96e3089
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a96e3089
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a96e3089

Branch: refs/heads/apache-crunch-0.8
Commit: a96e30892596bb9914d85673288262f6f7d74d67
Parents: 617105b
Author: Josh Wills <jwills@apache.org>
Authored: Mon Apr 21 08:28:26 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Apr 21 19:20:55 2014 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/SparkUnionResultsIT.java    | 18 ++++++++++++++++++
 .../impl/spark/collect/UnionCollection.java       | 12 +++++++++++-
 .../crunch/impl/spark/collect/UnionTable.java     | 10 +++++++++-
 3 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a96e3089/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
index db8509b..785f45a 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.crunch.impl.spark.SparkPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
+import org.apache.crunch.lib.PTables;
 import org.apache.crunch.test.CrunchTestSupport;
 import org.apache.crunch.types.writable.Writables;
 import org.junit.Test;
@@ -79,6 +80,23 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab
   }
 
   @Test
+  public void testMultiGroupBy() throws Exception {
+    String inputPath = tempDir.copyResourceFileName("set1.txt");
+    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+    String output = tempDir.getFileName("output");
+
+    Pipeline pipeline = new SparkPipeline("local", "multigroupby");
+
+    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
+        Writables.pairs(Writables.strings(), Writables.longs()));
+    PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+    PTables.asPTable(set2Counts.union(set1Lengths)).groupByKey().ungroup()
+        .write(At.sequenceFile(output, Writables.strings(), Writables.longs()));
+    pipeline.done();
+  }
+
+  @Test
   public void testMultiWrite() throws Exception {
     String inputPath = tempDir.copyResourceFileName("set1.txt");
     String inputPath2 = tempDir.copyResourceFileName("set2.txt");

http://git-wip-us.apache.org/repos/asf/crunch/blob/a96e3089/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
index 4e8b25a..5c18665 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
@@ -17,12 +17,17 @@
  */
 package org.apache.crunch.impl.spark.collect;
 
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PTableBase;
 import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.FlatMapPairDoFn;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.storage.StorageLevel;
 
 import java.util.List;
@@ -51,7 +56,12 @@ public class UnionCollection<S> extends BaseUnionCollection<S>
implements SparkC
     List<PCollectionImpl<?>> parents = getParents();
     JavaRDD[] rdds = new JavaRDD[parents.size()];
     for (int i = 0; i < rdds.length; i++) {
-      rdds[i] = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+      if (parents.get(i) instanceof PTableBase) {
+        JavaPairRDD prdd = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+        rdds[i] = prdd.mapPartitions(new FlatMapPairDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext()));
+      } else {
+        rdds[i] = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+      }
     }
     return runtime.getSparkContext().union(rdds);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a96e3089/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
index 867a95d..b2776c5 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
@@ -17,12 +17,15 @@
  */
 package org.apache.crunch.impl.spark.collect;
 
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.collect.BaseUnionTable;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.dist.collect.PTableBase;
 import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.PairFlatMapDoFn;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.storage.StorageLevel;
 
@@ -52,7 +55,12 @@ public class UnionTable<K, V> extends BaseUnionTable<K, V>
implements SparkColle
     List<PCollectionImpl<?>> parents = getParents();
     JavaPairRDD[] rdds = new JavaPairRDD[parents.size()];
     for (int i = 0; i < rdds.length; i++) {
-      rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+      if (parents.get(i) instanceof PTableBase) {
+        rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+      } else {
+        JavaRDD rdd = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+        rdds[i] = rdd.mapPartitions(new PairFlatMapDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext()));
+      }
     }
     return runtime.getSparkContext().union(rdds);
   }


Mime
View raw message