crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: Change usage of Iterable in PGroupedTable to TraversableOnce/Iterator to prevent SingleUseIterator exceptions in Scrunch
Date Thu, 07 Aug 2014 18:10:34 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 3a760cdae -> ea777acdf


Change usage of Iterable in PGroupedTable to TraversableOnce/Iterator to prevent SingleUseIterator
exceptions in Scrunch

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ea777acd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ea777acd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ea777acd

Branch: refs/heads/apache-crunch-0.8
Commit: ea777acdf15987bf78afcd76980416d84ddef37e
Parents: 3a760cd
Author: David Whiting <davw@spotify.com>
Authored: Mon Aug 4 10:20:22 2014 -0400
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Aug 7 09:34:59 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/scrunch/PGroupedTable.scala   | 85 ++++++++++----------
 .../crunch/scrunch/PGroupedTableTest.scala      | 35 ++++++++
 2 files changed, 77 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ea777acd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
index c86611a..bd48202 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
@@ -19,7 +19,6 @@ package org.apache.crunch.scrunch
 
 import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable,
Pair => CPair, _}
 import java.lang.{Iterable => JIterable}
-import scala.collection.Iterable
 import scala.collection.JavaConversions._
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
 
@@ -27,21 +26,21 @@ class PGroupedTable[K, V](val native: JGroupedTable[K, V])
     extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K,
V]] {
   import PGroupedTable._
 
-  type FunctionType[T] = (K, Iterable[V]) => T
-  type CtxtFunctionType[T] = (K, Iterable[V], TIOC) => T
+  type FunctionType[T] = (K, TraversableOnce[V]) => T
+  type CtxtFunctionType[T] = (K, TraversableOnce[V], TIOC) => T
 
-  protected def wrapFlatMapFn[T](fmt: (K, Iterable[V]) => TraversableOnce[T]) = flatMapFn(fmt)
-  protected def wrapMapFn[T](fmt: (K, Iterable[V]) => T) = mapFn(fmt)
-  protected def wrapFilterFn(fmt: (K, Iterable[V]) => Boolean) = filterFn(fmt)
-  protected def wrapFlatMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => TraversableOnce[T])
= {
+  protected def wrapFlatMapFn[T](fmt: (K, TraversableOnce[V]) => TraversableOnce[T]) =
flatMapFn(fmt)
+  protected def wrapMapFn[T](fmt: (K, TraversableOnce[V]) => T) = mapFn(fmt)
+  protected def wrapFilterFn(fmt: (K, TraversableOnce[V]) => Boolean) = filterFn(fmt)
+  protected def wrapFlatMapWithCtxtFn[T](fmt: (K, TraversableOnce[V], TIOC) => TraversableOnce[T])
= {
     flatMapWithCtxtFn(fmt)
   }
-  protected def wrapMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => T) = mapWithCtxtFn(fmt)
-  protected def wrapFilterWithCtxtFn(fmt: (K, Iterable[V], TIOC) => Boolean) = filterWithCtxtFn(fmt)
-  protected def wrapPairFlatMapFn[S, T](fmt: (K, Iterable[V]) => TraversableOnce[(S, T)])
= pairFlatMapFn(fmt)
-  protected def wrapPairMapFn[S, T](fmt: (K, Iterable[V]) => (S, T)) = pairMapFn(fmt)
+  protected def wrapMapWithCtxtFn[T](fmt: (K, TraversableOnce[V], TIOC) => T) = mapWithCtxtFn(fmt)
+  protected def wrapFilterWithCtxtFn(fmt: (K, TraversableOnce[V], TIOC) => Boolean) =
filterWithCtxtFn(fmt)
+  protected def wrapPairFlatMapFn[S, T](fmt: (K, TraversableOnce[V]) => TraversableOnce[(S,
T)]) = pairFlatMapFn(fmt)
+  protected def wrapPairMapFn[S, T](fmt: (K, TraversableOnce[V]) => (S, T)) = pairMapFn(fmt)
 
-  def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f))
+  def combine(f: TraversableOnce[V] => V) = combineValues(new TraversableOnceCombineFn[K,
V](f))
 
   def combineValues(agg: Aggregator[V]) = new PTable[K, V](native.combineValues(agg))
 
@@ -58,68 +57,68 @@ class PGroupedTable[K, V](val native: JGroupedTable[K, V])
   }
 }
 
-class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] {
+class TraversableOnceCombineFn[K, V](f: TraversableOnce[V] => V) extends CombineFn[K,
V] {
   override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = {
-    emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()))))
+    emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()).iterator)))
   }
 }
 
-trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V],
TraversableOnce[T]] {
+trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, TraversableOnce[V],
TraversableOnce[T]] {
   override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
-    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
+    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator))
{
       emitter.emit(v)
     }
   }
 }
 
-trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V],
T] {
+trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, TraversableOnce[V],
T] {
   override def map(input: CPair[K, JIterable[V]]) = {
-    apply(input.first(), iterableAsScalaIterable[V](input.second()))
+    apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)
   }
 }
 
-trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V],
Boolean] {
+trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, TraversableOnce[V],
Boolean] {
   override def accept(input: CPair[K, JIterable[V]]) = {
-    apply(input.first(), iterableAsScalaIterable[V](input.second()))
+    apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)
   }
 }
 
-class SDoGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _,
_, _]) => TraversableOnce[T])
+class SDoGroupedWithCtxtFn[K, V, T](val f: (K, TraversableOnce[V], TaskInputOutputContext[_,
_, _, _]) => TraversableOnce[T])
   extends DoFn[CPair[K, JIterable[V]], T] {
   override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
-    for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()), getContext))
{
+    for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext))
{
       emitter.emit(v)
     }
   }
 }
 
-class SMapGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _,
_, _]) => T)
+class SMapGroupedWithCtxtFn[K, V, T](val f: (K, TraversableOnce[V], TaskInputOutputContext[_,
_, _, _]) => T)
   extends MapFn[CPair[K, JIterable[V]], T] {
   override def map(input: CPair[K, JIterable[V]]) = {
-    f(input.first(), iterableAsScalaIterable[V](input.second()), getContext)
+    f(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext)
   }
 }
 
-class SFilterGroupedWithCtxtFn[K, V](val f: (K, Iterable[V], TaskInputOutputContext[_, _,
_, _]) => Boolean)
+class SFilterGroupedWithCtxtFn[K, V](val f: (K, TraversableOnce[V], TaskInputOutputContext[_,
_, _, _]) => Boolean)
   extends FilterFn[CPair[K, JIterable[V]]] {
   override def accept(input: CPair[K, JIterable[V]]) = {
-    f.apply(input.first(), iterableAsScalaIterable[V](input.second()), getContext)
+    f.apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext)
   }
 }
 
 trait SDoPairGroupedFn[K, V, S, T] extends DoFn[CPair[K, JIterable[V]], CPair[S, T]]
-    with Function2[K, Iterable[V], TraversableOnce[(S, T)]] {
+    with Function2[K, TraversableOnce[V], TraversableOnce[(S, T)]] {
   override def process(input: CPair[K, JIterable[V]], emitter: Emitter[CPair[S, T]]) {
-    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
+    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator))
{
       emitter.emit(CPair.of(v._1, v._2))
     }
   }
 }
 
 trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]], CPair[S, T]]
-    with Function2[K, Iterable[V], (S, T)] {
+    with Function2[K, TraversableOnce[V], (S, T)] {
   override def map(input: CPair[K, JIterable[V]]) = {
-    val t = apply(input.first(), iterableAsScalaIterable[V](input.second()))
+    val t = apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)
     CPair.of(t._1, t._2)
   }
 }
@@ -127,35 +126,35 @@ trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]],
CPair[
 object PGroupedTable {
   type TIOC = TaskInputOutputContext[_, _, _, _]
 
-  def flatMapFn[K, V, T](fn: (K, Iterable[V]) => TraversableOnce[T]) = {
-    new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  def flatMapFn[K, V, T](fn: (K, TraversableOnce[V]) => TraversableOnce[T]) = {
+    new SDoGroupedFn[K, V, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
   }
 
-  def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = {
-    new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  def mapFn[K, V, T](fn: (K, TraversableOnce[V]) => T) = {
+    new SMapGroupedFn[K, V, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
   }
 
-  def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
-    new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  def filterFn[K, V](fn: (K, TraversableOnce[V]) => Boolean) = {
+    new SFilterGroupedFn[K, V] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
   }
 
-  def flatMapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => TraversableOnce[T]) = {
+  def flatMapWithCtxtFn[K, V, T](fn: (K, TraversableOnce[V], TIOC) => TraversableOnce[T])
= {
     new SDoGroupedWithCtxtFn[K, V, T](fn)
   }
 
-  def mapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => T) = {
+  def mapWithCtxtFn[K, V, T](fn: (K, TraversableOnce[V], TIOC) => T) = {
     new SMapGroupedWithCtxtFn[K, V, T](fn)
   }
 
-  def filterWithCtxtFn[K, V](fn: (K, Iterable[V], TIOC) => Boolean) = {
+  def filterWithCtxtFn[K, V](fn: (K, TraversableOnce[V], TIOC) => Boolean) = {
     new SFilterGroupedWithCtxtFn[K, V](fn)
   }
 
-  def pairMapFn[K, V, S, T](fn: (K, Iterable[V]) => (S, T)) = {
-    new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  def pairMapFn[K, V, S, T](fn: (K, TraversableOnce[V]) => (S, T)) = {
+    new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v)
}
   }
 
-  def pairFlatMapFn[K, V, S, T](fn: (K, Iterable[V]) => TraversableOnce[(S, T)]) = {
-    new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  def pairFlatMapFn[K, V, S, T](fn: (K, TraversableOnce[V]) => TraversableOnce[(S, T)])
= {
+    new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v)
}
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ea777acd/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala
b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala
new file mode 100644
index 0000000..f5cfcf9
--- /dev/null
+++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.junit.Test
+import org.junit.Assert._
+
+class PGroupedTableTest extends CrunchSuite {
+  @Test
+  def testReduceInCombine() {
+    val aSum = Mem.collectionOf(("a", 1), ("a", 2), ("b", 1))
+      .map{ case (a,b) => (a,b) }
+      .groupByKey()
+      .combine(_.reduce {_+_} )
+      .asMap()
+      .value()("a")
+
+    assertEquals(3, aSum)
+  }
+}
\ No newline at end of file


Mime
View raw message