Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B6F511641 for ; Fri, 20 Jun 2014 03:43:50 +0000 (UTC) Received: (qmail 96917 invoked by uid 500); 20 Jun 2014 03:43:50 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 96874 invoked by uid 500); 20 Jun 2014 03:43:50 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 96864 invoked by uid 99); 20 Jun 2014 03:43:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jun 2014 03:43:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D8859986263; Fri, 20 Jun 2014 03:43:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <3c7499082efd4edaa0fa6254f6e0e79e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-424: Aggregators for Scrunch based on Algebird's Monoid type (for sums) and Scala's Ordering type (for min/max). Date: Fri, 20 Jun 2014 03:43:49 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 24b0513ce -> a951068bc CRUNCH-424: Aggregators for Scrunch based on Algebird's Monoid type (for sums) and Scala's Ordering type (for min/max). Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a951068b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a951068b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a951068b Branch: refs/heads/apache-crunch-0.8 Commit: a951068bcac1418681f29ff9ca7dbc9bff2cde20 Parents: 24b0513 Author: Josh Wills Authored: Tue Jun 17 23:40:32 2014 -0700 Committer: Josh Wills Committed: Thu Jun 19 20:42:01 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/PCollection.java | 1 - .../java/org/apache/crunch/fn/Aggregators.java | 2 +- .../java/org/apache/crunch/lib/Aggregate.java | 8 +- crunch-scrunch/pom.xml | 4 + .../apache/crunch/scrunch/ScalaTypesTest.scala | 4 +- .../org/apache/crunch/scrunch/Aggregators.scala | 271 +++++++++++++++++++ .../org/apache/crunch/scrunch/PCollection.scala | 2 + .../apache/crunch/scrunch/AggregatorsTest.scala | 49 ++++ pom.xml | 7 + 9 files changed, 340 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-core/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java index bf5bacc..878fbb9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java @@ -277,5 +277,4 @@ public interface PCollection { * Returns a {@code PCollection} that contains the result of aggregating all values in this instance. */ PCollection aggregate(Aggregator aggregator); - } http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java index 0ac79e2..084cca4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -121,7 +121,7 @@ public final class Aggregators { * @return The newly constructed instance */ public static Aggregator MAX_LONGS(int n) { - return new MaxLongs(); + return new MaxNAggregator(n); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index 7a71646..68c23e1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -301,11 +301,11 @@ public class Aggregate { public static PCollection aggregate(PCollection collect, Aggregator aggregator) { PTypeFamily tf = collect.getTypeFamily(); - return collect.parallelDo("Aggregate.aggregator", new MapFn>() { - public Pair map(S input) { - return Pair.of(false, input); + return collect.parallelDo("Aggregate.aggregator", new MapFn>() { + public Pair map(S input) { + return Pair.of(null, input); } - }, tf.tableOf(tf.booleans(), collect.getPType())) + }, tf.tableOf(tf.nulls(), collect.getPType())) .groupByKey(1) .combineValues(aggregator) .values(); http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-scrunch/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml index 6c3943f..e97133a 100644 --- a/crunch-scrunch/pom.xml +++ b/crunch-scrunch/pom.xml @@ -42,6 +42,10 @@ under the License. jline + com.twitter + algebird-core_${scala.base.version} + + org.apache.crunch crunch-core http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala index de9a5f9..e4dc771 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala @@ -40,10 +40,10 @@ class ScalaTypesTest extends CrunchSuite { val out = pipeline.read(From.textFile(shakespeare)) .map(x => if (x.startsWith("a")) Some(x) else None) + .aggregate(Aggregators.sum) // uses Algebird Monoid[Option[String]] .materialize - .take(100) - pipeline.done assert(out.exists(!_.isEmpty)) + pipeline.done } @Test http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala new file mode 100644 index 0000000..238a01f --- /dev/null +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala @@ -0,0 +1,271 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import org.apache.crunch.fn.{Aggregators => JAgg} +import org.apache.crunch._ +import com.google.common.collect.ImmutableList +import org.apache.hadoop.conf.Configuration +import java.lang.{Iterable => JIterable} +import scala.collection.JavaConversions +import com.twitter.algebird.Monoid + +/** + * Scrunch versions of the common Aggregator types from Crunch. + */ +object Aggregators { + + import scala.math.Ordering._ + + def sum[T: Monoid]: Aggregator[T] = sumUsing(implicitly[Monoid[T]]) + + def sumUsing[T](m: Monoid[T]): Aggregator[T] = new SimpleAggregator[T] { + def reset { + sum = m.zero + } + + def update(next: T) { + sum = m.plus(sum, next) + } + + def results: JIterable[T] = { + return ImmutableList.of(sum) + } + + private var sum: T = m.zero + } + + def max[T: Ordering] = new SimpleAggregator[T] { + def reset { + max = None + } + + def update(next: T) { + if (max.isEmpty || implicitly[Ordering[T]].lt(max.get, next)) { + max = Some(next) + } + } + + def results: JIterable[T] = { + return JavaConversions.asJavaIterable(max.toIterable) + } + + private var max: Option[T] = None + } + + def min[T: Ordering] = new SimpleAggregator[T] { + def reset { + min = None + } + + def update(next: T) { + if (min.isEmpty || implicitly[Ordering[T]].gt(min.get, next)) { + min = Some(next) + } + } + + def results: JIterable[T] = { + return JavaConversions.asJavaIterable(min.toIterable) + } + + private var min: Option[T] = None + } + + /** + * Return the first {@code n} values (or fewer if there are fewer values than {@code n}). + * + * @param n The number of values to return + * @return The newly constructed instance + */ + def first[V](n: Int): Aggregator[V] = JAgg.FIRST_N(n) + + /** + * Return the last {@code n} values (or fewer if there are fewer values than {@code n}). + * + * @param n The number of values to return + * @return The newly constructed instance + */ + def last[V](n: Int) = JAgg.LAST_N(n) + + /** + * Concatenate strings, with a separator between strings. There + * is no limits of length for the concatenated string. + * + *

Note: String concatenation is not commutative, which means the + * result of the aggregation is not deterministic!

+ * + * @param separator + * the separator which will be appended between each string + * @param skipNull + * define if we should skip null values. Throw + * NullPointerException if set to false and there is a null + * value. + * @return The newly constructed instance + */ + def concat(separator: String, skipNull: Boolean) = JAgg.STRING_CONCAT(separator, skipNull) + + /** + * Concatenate strings, with a separator between strings. You can specify + * the maximum length of the output string and of the input strings, if + * they are > 0. If a value is <= 0, there is no limit. + * + *

Any too large string (or any string which would made the output too + * large) will be silently discarded.

+ * + *

Note: String concatenation is not commutative, which means the + * result of the aggregation is not deterministic!

+ * + * @param separator + * the separator which will be appended between each string + * @param skipNull + * define if we should skip null values. Throw + * NullPointerException if set to false and there is a null + * value. + * @param maxOutputLength + * the maximum length of the output string. If it's set <= 0, + * there is no limit. The number of characters of the output + * string will be < maxOutputLength. + * @param maxInputLength + * the maximum length of the input strings. If it's set <= 0, + * there is no limit. The number of characters of the input string + * will be < maxInputLength to be concatenated. + * @return The newly constructed instance + */ + def concat(separator: String, skipNull: Boolean, maxOutputLength: Long, maxInputLength: Long) = + JAgg.STRING_CONCAT(separator, skipNull, maxOutputLength, maxInputLength) + + /** + * Collect the unique elements of the input, as defined by the {@code equals} method for + * the input objects. No guarantees are made about the order in which the final elements + * will be returned. + * + * @return The newly constructed instance + */ + def unique[V]: Aggregator[V] = JAgg.UNIQUE_ELEMENTS() + + /** + * Collect a sample of unique elements from the input, where 'unique' is defined by + * the {@code equals} method for the input objects. No guarantees are made about which + * elements will be returned, simply that there will not be any more than the given sample + * size for any key. + * + * @param maximumSampleSize The maximum number of unique elements to return per key + * @return The newly constructed instance + */ + def sampleUnique(maximumSampleSize: Int) = JAgg.SAMPLE_UNIQUE_ELEMENTS(maximumSampleSize) + + /** + * Apply separate aggregators to each component of a {@link Tuple2}. + */ + def pair[V1, V2](a1: Aggregator[V1], a2: Aggregator[V2]): Aggregator[(V1, V2)] = { + return new Aggregators.PairAggregator[V1, V2](a1, a2) + } + + /** + * Apply separate aggregators to each component of a {@link Tuple3}. + */ + def trip[V1, V2, V3](a1: Aggregator[V1], a2: Aggregator[V2], a3: Aggregator[V3]): Aggregator[(V1, V2, V3)] = { + return new Aggregators.TripAggregator[V1, V2, V3](a1, a2, a3) + } + + /** + * Apply separate aggregators to each component of a {@link Tuple4}. + */ + def quad[V1, V2, V3, V4](a1: Aggregator[V1], a2: Aggregator[V2], a3: Aggregator[V3], a4: Aggregator[V4]) + : Aggregator[(V1, V2, V3, V4)] = { + return new Aggregators.QuadAggregator[V1, V2, V3, V4](a1, a2, a3, a4) + } + + /** + * Base class for aggregators that do not require any initialization. + */ + abstract class SimpleAggregator[T] extends Aggregator[T] { + def initialize(conf: Configuration) { + } + } + + private abstract class ProductAggregator[T <: Product](aggs: Array[Aggregator[_]]) extends Aggregator[T] { + def initialize(configuration: Configuration) { + for (a <- aggs) { + a.initialize(configuration) + } + } + + def reset { + for (a <- aggs) { + a.reset + } + } + + protected def updateTuple(t: T) { + var i: Int = 0 + while (i < aggs.length) { + aggs(i).asInstanceOf[Aggregator[Any]].update(t.productElement(i)) + i = i + 1 + } + } + + protected def results[T](index: Int): Iterable[T] = { + return JavaConversions.iterableAsScalaIterable(aggs(index).results.asInstanceOf[JIterable[T]]) + } + } + + private class PairAggregator[A, B](val a1: Aggregator[A], val a2: Aggregator[B]) + extends ProductAggregator[(A, B)](Array(a1, a2)) { + + def update(value: (A, B)) { + updateTuple(value) + } + + def results: JIterable[(A, B)] = { + return JavaConversions.asJavaIterable(results[A](0).zip(results[B](1))) + } + } + + private class TripAggregator[A, B, C](val a1: Aggregator[A], val a2: Aggregator[B], val a3: Aggregator[C]) + extends ProductAggregator[(A, B, C)](Array(a1, a2, a3)) { + def update(value: (A, B, C)) { + updateTuple(value) + } + + def results: JIterable[(A, B, C)] = { + return JavaConversions.asJavaIterable( + results[A](0).zip(results[B](1)).zip(results[C](2)) + .map(x => (x._1._1, x._1._2, x._2))) + } + } + + private class QuadAggregator[A, B, C, D](val a1: Aggregator[A], val a2: Aggregator[B], + val a3: Aggregator[C], val a4: Aggregator[D]) + extends ProductAggregator[(A, B, C, D)](Array(a1, a2, a3, a4)) { + + def update(value: (A, B, C, D)) { + updateTuple(value) + } + + def results: JIterable[(A, B, C, D)] = { + return JavaConversions.asJavaIterable( + (results[A](0).zip(results[B](1))).zip(results[C](2).zip(results[D](3))) + .map(x => (x._1._1, x._1._2, x._2._1, x._2._2))) + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index e2f7b5b..ed39d30 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -53,6 +53,8 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol new PCollection[S](native.union(others.map(_.native) : _*)) } + def aggregate(agg: Aggregator[S]) = wrap(native.aggregate(agg)) + def by[K: PTypeH](f: S => K): PTable[K, S] = { val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType()) parallelDo(mapKeyFn[S, K](f), ptype) http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala new file mode 100644 index 0000000..e5d4d78 --- /dev/null +++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala @@ -0,0 +1,49 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.{Assert, Test} + +class AggregatorsTest extends JUnitSuite { + + @Test def testSum { + val pc = Mem.collectionOf((1, 0.1), (2, 1.2), (3, 2.2), (4, 2.0), (5, 0.0)) + val sum = pc.aggregate(Aggregators.sum).materialize().toList + Assert.assertEquals(1, sum.size) + Assert.assertEquals(15, sum(0)._1) + Assert.assertEquals(5.5, sum(0)._2, 0.001) + } + + @Test def testMin { + val pc = Mem.collectionOf(1, 2, 3, 4, 5) + val min = pc.aggregate(Aggregators.min).materialize().toList + Assert.assertEquals(1, min.size) + Assert.assertEquals(1, min(0)) + } + + @Test def testMax { + val pc = Mem.collectionOf(1, 2, 3, 4, 5) + val max = pc.aggregate(Aggregators.max).materialize().toList + Assert.assertEquals(1, max.size) + Assert.assertEquals(5, max(0)) + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a951068b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3aa1f79..fab80b5 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ under the License. 0.8.0 1.6.1 1.2.15 + 0.6.0 4.10 1.1 1.9.0 @@ -360,6 +361,12 @@ under the License. + com.twitter + algebird-core_${scala.base.version} + ${algebird.version} + + + org.scala-lang scala-library ${scala.version}