incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-30 Add cross join to Scrunch PTable and PCollection
Date Mon, 06 Aug 2012 23:58:35 GMT
Updated Branches:
  refs/heads/master daa16452c -> fd1fc3ad6


CRUNCH-30 Add cross join to Scrunch PTable and PCollection

This commit makes the lib.join.Cartesian#cross function easily available
to Scrunch.  It also adds an accessor for PTable.materializeToMap().

A test is included.

Signed-off-by: jwills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/fd1fc3ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/fd1fc3ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/fd1fc3ad

Branch: refs/heads/master
Commit: fd1fc3ad69334c25dfaee044785385ba95981f91
Parents: daa1645
Author: Brian Martin <brianmartin@gmail.com>
Authored: Mon Aug 6 15:57:00 2012 -0700
Committer: jwills <jwills@apache.org>
Committed: Mon Aug 6 16:14:12 2012 -0700

----------------------------------------------------------------------
 .../scala/org/apache/scrunch/PCollection.scala     |   10 ++-
 .../src/main/scala/org/apache/scrunch/PTable.scala |   14 +++-
 .../scala/org/apache/scrunch/CrossJoinTest.scala   |   61 +++++++++++++++
 3 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fd1fc3ad/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
index d7ebee5..0924587 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions
 
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair =>
CPair, Target}
-import org.apache.crunch.lib.Aggregate
+import org.apache.crunch.lib.{Aggregate, Cartesian}
 import org.apache.scrunch.Conversions._
 import org.apache.scrunch.interpreter.InterpreterRunner
 
@@ -54,6 +54,12 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S,
PCol
     by(f).groupByKey
   }
 
+  def cross[S2](other: PCollection[S2]): PCollection[(S, S2)] = {
+    val inter = Cartesian.cross(this.native, other.native)
+    val f = (in: CPair[S, S2]) => (in.first(), in.second())
+    inter.parallelDo(mapFn(f), getTypeFamily().tuple2(pType, other.pType))
+  }
+
   def materialize() = {
     InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
     JavaConversions.iterableAsScalaIterable[S](native.materialize)
@@ -67,6 +73,8 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S,
PCol
   }
 
   def max() = wrap(Aggregate.max(native))
+
+  def pType = native.getPType()
 }
 
 trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fd1fc3ad/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
index 984d9dc..b6d95a6 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConversions._
 
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
-import org.apache.crunch.lib.{Join, Aggregate, Cogroup, PTables}
+import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables}
 import org.apache.scrunch.interpreter.InterpreterRunner
 
 class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K,
V], JTable[K, V]] {
@@ -100,6 +100,13 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
     join[V2](Join.fullJoin[K, V, V2](_, _), other)
   }
 
+  def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {
+    val ptf = getTypeFamily()
+    val inter = new PTable(Cartesian.cross(this.native, other.native))
+    val f = (k: CPair[K,K2], v: CPair[V,V2]) => CPair.of((k.first(), k.second()), (v.first(),
v.second()))
+    inter.parallelDo(mapFn(f), ptf.tableOf(ptf.tuple2(keyType, other.keyType), ptf.tuple2(valueType,
other.valueType)))
+  }
+
   def top(limit: Int, maximize: Boolean) = {
     wrap(Aggregate.top(this.native, limit, maximize))
   }
@@ -121,6 +128,11 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
     native.materialize.view.map(x => (x.first, x.second))
   }
 
+  def materializeToMap(): Map[K, V] = {
+    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+    native.materializeToMap().view.toMap
+  }
+
   def keyType() = native.getPTableType().getKeyType()
 
   def valueType() = native.getPTableType().getValueType()

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fd1fc3ad/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala b/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala
new file mode 100644
index 0000000..bbbf849
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class CrossJoinTest extends JUnitSuite {
+
+  @Test
+  def testCrossCollection() {
+    val testCases = List(Array(1,2,3,4,5), Array(6,7,8), Array.empty[Int])
+    val testCasePairs = testCases flatMap {test1 => testCases map {test2 => (test1,test2)}}
+
+    for ((test1, test2) <- testCasePairs) {
+      val X = Mem.collectionOf(test1: _*)
+      val Y = Mem.collectionOf(test2: _*)
+      val cross = X.cross(Y)
+
+      val crossSet = cross.materialize().toSet
+
+      assert(crossSet.size == test1.size * test2.size)
+      assert(test1.flatMap(t1 => test2.map(t2 => crossSet.contains((t1, t2)))).forall(_
== true))
+
+    }
+  }
+
+  @Test
+  def testCrossTable() {
+    val testCases = List(Array((1,2),(3,4),(5,6)), Array((7,8),(9,10)), Array.empty[(Int,Int)])
+    val testCasePairs = testCases flatMap {test1 => testCases map {test2 => (test1,test2)}}
+
+    for ((test1, test2) <- testCasePairs) {
+      val X = Mem.tableOf(test1)
+      val Y = Mem.tableOf(test2)
+      val cross = X.cross(Y)
+
+      val crossSet = cross.materializeToMap().toSet
+      val actualCross = test1.flatMap(t1 => test2.map(t2 => ((t1._1, t2._1), (t1._2,
t2._2))))
+
+      assert(crossSet.size == test1.size * test2.size)
+      assert(actualCross.map(crossSet.contains(_)).forall(_ == true))
+    }
+  }
+
+}


Mime
View raw message