Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D34CA1767C for ; Mon, 12 Jan 2015 18:26:00 +0000 (UTC) Received: (qmail 67902 invoked by uid 500); 12 Jan 2015 18:26:02 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 67872 invoked by uid 500); 12 Jan 2015 18:26:02 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 67863 invoked by uid 99); 12 Jan 2015 18:26:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jan 2015 18:26:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0691DA01C22; Mon, 12 Jan 2015 18:26:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: crunch git commit: CRUNCH-484: Add library features from spotify/crunch-lib into crunch-core. Date: Mon, 12 Jan 2015 18:26:01 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master 48e4e7941 -> 3477ea431 CRUNCH-484: Add library features from spotify/crunch-lib into crunch-core. Signed-off-by: Josh Wills Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3477ea43 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3477ea43 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3477ea43 Branch: refs/heads/master Commit: 3477ea431015a81ad2cee825ca9410f6da441dee Parents: 48e4e79 Author: David Whiting Authored: Mon Jan 12 15:04:27 2015 +0100 Committer: Josh Wills Committed: Mon Jan 12 10:15:55 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/Average.java | 59 +++++ .../main/java/org/apache/crunch/lib/DoFns.java | 101 ++++++++ .../java/org/apache/crunch/lib/PTables.java | 17 ++ .../java/org/apache/crunch/lib/Quantiles.java | 254 +++++++++++++++++++ .../java/org/apache/crunch/lib/TopList.java | 110 ++++++++ .../java/org/apache/crunch/lib/AverageTest.java | 52 ++++ .../java/org/apache/crunch/lib/DoFnsTest.java | 86 +++++++ .../java/org/apache/crunch/lib/PTablesTest.java | 40 +++ .../org/apache/crunch/lib/QuantilesTest.java | 126 +++++++++ .../java/org/apache/crunch/lib/TopListTest.java | 68 +++++ 10 files changed, 913 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/Average.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Average.java b/crunch-core/src/main/java/org/apache/crunch/lib/Average.java new file mode 100644 index 0000000..df6ed3c --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Average.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES; +import static org.apache.crunch.fn.Aggregators.SUM_LONGS; +import static org.apache.crunch.fn.Aggregators.pairAggregator; + +public class Average { + + /** + * Calculate the mean average value by key for a table with numeric values. + * @param table PTable of (key, value) pairs to operate on + * @param Key type, can be any type + * @param Value type, must be numeric (ie. extend java.lang.Number) + * @return PTable<K, Double> of (key, mean(values)) pairs + */ + public static PTable meanValue(PTable table) { + PTypeFamily ptf = table.getTypeFamily(); + PTable> withCounts = table.mapValues(new MapFn>() { + + @Override + public Pair map(V input) { + return Pair.of(input.doubleValue(), 1L); + } + }, ptf.pairs(ptf.doubles(), ptf.longs())); + PGroupedTable> grouped = withCounts.groupByKey(); + + return grouped.combineValues(pairAggregator(SUM_DOUBLES(), SUM_LONGS())) + .mapValues(new MapFn, Double>() { + @Override + public Double map(Pair input) { + return input.first() / input.second(); + } + }, ptf.doubles()); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java new file mode 100644 index 0000000..cbf819f --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; + +import java.io.Serializable; + +public class DoFns { + /** + * "Reduce" DoFn wrapper which detaches the values in the iterable, preventing the unexpected behaviour related to + * object reuse often observed when using Avro. Wrap your DoFn in a detach(...) and pass in a PType for the Iterable + * value, and then you'll be handed an Iterable of real distinct objects, instead of the same object being handed to + * you multiple times with different data. + * + * You should use this when you have a parallelDo after a groupBy, and you'd like to capture the objects arriving in + * the Iterable part of the incoming Pair and pass it through to the output (for example if you want to create an + * array of outputs from the values to be output as one record). + * + * The will incur a performance hit, as it means that every object read from the Iterable will allocate a new Java + * object for the record and objects for all its non-primitive fields too. If you are rolling up records into a + * collection then this will be necessary anyway, but if you are only outputting derived data this may impact the + * speed and memory usage of your job unnecessarily. + * + * @param reduceFn Underlying DoFn to wrap + * @param valueType PType of the object contained within the Iterable + * @param Reduce key + * @param Iterable value + * @param Output type of DoFn + * @return DoFn which will detach values for you + */ + public static DoFn>, T> detach(final DoFn>, T> reduceFn, final PType valueType) { + return new DetachingDoFn(reduceFn, valueType); + } + + private static class DetachFunction implements Function, Serializable { + private final PType pType; + + public DetachFunction(PType initializedPType) { + this.pType = initializedPType; + } + + @Override + public T apply(T t) { + return pType.getDetachedValue(t); + } + } + + private static class DetachingDoFn extends DoFn>, T> { + + private final DoFn>, T> reduceFn; + private final PType valueType; + + public DetachingDoFn(DoFn>, T> reduceFn, PType valueType) { + this.reduceFn = reduceFn; + this.valueType = valueType; + } + + @Override + public void configure(Configuration configuration) { + super.configure(configuration); + reduceFn.configure(configuration); + } + + @Override + public void initialize() { + reduceFn.initialize(); + valueType.initialize(getConfiguration() == null ? new Configuration() : getConfiguration()); + } + + @Override + public void process(Pair> input, Emitter emitter) { + reduceFn.process(Pair.of(input.first(), detachIterable(input.second(), valueType)), emitter); + } + + public Iterable detachIterable(Iterable iterable, final PType pType) { + return Iterables.transform(iterable, new DetachFunction(pType)); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java index e0a3bf3..465e3f1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java @@ -213,4 +213,21 @@ public class PTables { return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), (Iterable) detachedIterable); } + + /** + * Swap the key and value part of a table. The original PTypes are used in the opposite order + * @param table PTable to process + * @param Key type (will become value type) + * @param Value type (will become key type) + * @return PType<V, K> containing the same data as the original + */ + public static PTable swapKeyValue(PTable table) { + PTypeFamily ptf = table.getTypeFamily(); + return table.parallelDo(new MapFn, Pair>() { + @Override + public Pair map(Pair input) { + return Pair.of(input.second(), input.first()); + } + }, ptf.tableOf(table.getValueType(), table.getKeyType())); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java new file mode 100644 index 0000000..d6fc454 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.base.Function; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.PeekingIterator; +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class Quantiles { + + /** + * Calculate a set of quantiles for each key in a numerically-valued table. + * + * Quantiles are calculated on a per-key basis by counting, joining and sorting. This is highly scalable, but takes + * 2 more map-reduce cycles than if you can guarantee that the value set will fit into memory. Use inMemory + * if you have less than the order of 10M values per key. + * + * The quantile definition that we use here is the "nearest rank" defined here: + * http://en.wikipedia.org/wiki/Percentile#Definition + * + * @param table numerically-valued PTable + * @param p1 First quantile (in the range 0.0 - 1.0) + * @param pn More quantiles (in the range 0.0 - 1.0) + * @param Key type of the table + * @param Value type of the table (must extends java.lang.Number) + * @return PTable of each key with a collection of pairs of the quantile provided and it's result. + */ + public static PTable> distributed(PTable table, + double p1, double... pn) { + final List quantileList = createListFromVarargs(p1, pn); + + PTypeFamily ptf = table.getTypeFamily(); + PTable totalCounts = table.keys().count(); + PTable> countValuePairs = totalCounts.join(table); + PTable> valueCountPairs = + countValuePairs.mapValues(new SwapPairComponents(), ptf.pairs(table.getValueType(), ptf.longs())); + + + return SecondarySort.sortAndApply( + valueCountPairs, + new DistributedQuantiles(quantileList), + ptf.tableOf(table.getKeyType(), Result.pType(table.getValueType()))); + } + + /** + * Calculate a set of quantiles for each key in a numerically-valued table. + * + * Quantiles are calculated on a per-key basis by grouping, reading the data into memory, then sorting and + * and calculating. This is much faster than the distributed option, but if you get into the order of 10M+ per key, then + * performance might start to degrade or even cause OOMs. + * + * The quantile definition that we use here is the "nearest rank" defined here: + * http://en.wikipedia.org/wiki/Percentile#Definition + * + * @param table numerically-valued PTable + * @param p1 First quantile (in the range 0.0 - 1.0) + * @param pn More quantiles (in the range 0.0 - 1.0) + * @param Key type of the table + * @param Value type of the table (must extends java.lang.Number) + * @return PTable of each key with a collection of pairs of the quantile provided and it's result. + */ + public static PTable> inMemory(PTable table, + double p1, double... pn) { + final List quantileList = createListFromVarargs(p1, pn); + + PTypeFamily ptf = table.getTypeFamily(); + + return table + .groupByKey() + .parallelDo(new InMemoryQuantiles(quantileList), + ptf.tableOf(table.getKeyType(), Result.pType(table.getValueType()))); + } + + private static List createListFromVarargs(double p1, double[] pn) { + final List quantileList = Lists.newArrayList(p1); + for (double p: pn) { + quantileList.add(p); + } + return quantileList; + } + + private static class SwapPairComponents extends MapFn, Pair> { + @Override + public Pair map(Pair input) { + return Pair.of(input.second(), input.first()); + } + } + + private static Collection> findQuantiles(Iterator sortedCollectionIterator, + long collectionSize, List quantiles) { + Collection> output = Lists.newArrayList(); + Multimap quantileIndices = ArrayListMultimap.create(); + + for (double quantile: quantiles) { + long idx = Math.max((int) Math.ceil(quantile * collectionSize) - 1, 0); + quantileIndices.put(idx, quantile); + } + + long index = 0; + while (sortedCollectionIterator.hasNext()) { + V value = sortedCollectionIterator.next(); + if (quantileIndices.containsKey(index)) { + for (double quantile: quantileIndices.get(index)) { + output.add(Pair.of(quantile, value)); + } + } + index++; + } + return output; + } + + private static class InMemoryQuantiles extends + MapFn>, Pair>> { + private final List quantileList; + + public InMemoryQuantiles(List quantiles) { + this.quantileList = quantiles; + } + + @Override + public Pair> map(Pair> input) { + List values = Lists.newArrayList(input.second().iterator()); + Collections.sort(values); + return Pair.of(input.first(), new Result(values.size(), findQuantiles(values.iterator(), values.size(), quantileList))); + } + } + + private static class DistributedQuantiles extends + MapFn>>, Pair>> { + private final List quantileList; + + public DistributedQuantiles(List quantileList) { + this.quantileList = quantileList; + } + + @Override + public Pair> map(Pair>> input) { + + PeekingIterator> iterator = Iterators.peekingIterator(input.second().iterator()); + long count = iterator.peek().second(); + + Iterator valueIterator = Iterators.transform(iterator, new Function, V>() { + @Override + public V apply(@Nullable Pair input) { + return input.first(); + } + }); + + Collection> output = findQuantiles(valueIterator, count, quantileList); + return Pair.of(input.first(), new Result(count, output)); + } + } + + /** + * Output type for storing the results of a Quantiles computation + * @param Quantile value type + */ + public static class Result { + public final long count; + public final Map quantiles = Maps.newTreeMap(); + + public Result(long count, Iterable> quantiles) { + this.count = count; + for (Pair quantile: quantiles) { + this.quantiles.put(quantile.first(), quantile.second()); + } + } + + /** + * Create a PType for the result type, to be stored as a derived type from Crunch primitives + * @param valuePType PType for the V type, whose family will also be used to create the derived type + * @param Value type + * @return PType for serializing Result<V> + */ + public static PType> pType(PType valuePType) { + PTypeFamily ptf = valuePType.getFamily(); + + @SuppressWarnings("unchecked") + Class> prClass = (Class>)(Class)Result.class; + + return ptf.derivedImmutable(prClass, new MapFn>, Long>, Result>() { + @Override + public Result map(Pair>, Long> input) { + return new Result(input.second(), input.first()); + } + }, new MapFn, Pair>, Long>>() { + @Override + public Pair>, Long> map(Result input) { + return Pair.of(asCollection(input.quantiles), input.count); + } + }, ptf.pairs(ptf.collections(ptf.pairs(ptf.doubles(), valuePType)), ptf.longs())); + } + + private static Collection> asCollection(Map map) { + Collection> collection = Lists.newArrayList(); + for (Map.Entry entry: map.entrySet()) { + collection.add(Pair.of(entry.getKey(), entry.getValue())); + } + return collection; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Result result = (Result) o; + + if (count != result.count) return false; + if (!quantiles.equals(result.quantiles)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) (count ^ (count >>> 32)); + result = 31 * result + quantiles.hashCode(); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java b/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java new file mode 100644 index 0000000..54d20f9 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.Lists; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; + +import java.util.Collection; +import java.util.Iterator; + +/** + * Tools for creating top lists of items in PTables and PCollections + */ +public class TopList { + + /** + * Create a top-list of elements in the provided PTable, categorised by the key of the input table and using the count + * of the value part of the input table. Example: if input = Table(Country, Track), then this will give you the most + * common n tracks for each country. + * @param input table of X Y pairs + * @param n How many Y values to include in the toplist per X (this will be in memory, so don't make this ridiculous) + * @param group type + * @param value type + * @return table of each unique X value mapped to a collection of (count, Y) pairs + */ + public static PTable>> topNYbyX(PTable input, final int n) { + final PType xType = input.getKeyType(); + final PType yType = input.getValueType(); + PTypeFamily f = xType.getFamily(); + PTable> counted = input.count().parallelDo(new MapFn, Long>, Pair>>() { + @Override + public Pair> map(Pair, Long> input) { + return Pair.of(input.first().first(), Pair.of(-input.second(), input.first().second())); + } + }, f.tableOf(xType, f.pairs(f.longs(), yType))); + return SecondarySort.sortAndApply(counted, new MapFn>>, Pair>>>() { + + private PTableType tableType; + + @Override + public void initialize() { + PTypeFamily ptf = yType.getFamily(); + tableType = ptf.tableOf(ptf.longs(), yType); + tableType.initialize(getConfiguration()); + } + + @Override + public Pair>> map(Pair>> input) { + Collection> values = Lists.newArrayList(); + Iterator> iter = input.second().iterator(); + for (int i = 0; i < n; i++) { + if (!iter.hasNext()) { + break; + } + Pair pair = PTables.getDetachedValue(tableType, iter.next()); + values.add(Pair.of(-pair.first(), pair.second())); + } + return Pair.of(input.first(), values); + } + }, f.tableOf(xType, f.collections(f.pairs(f.longs(), yType)))); + } + + /** + * Create a list of unique items in the input collection with their count, sorted descending by their frequency. + * @param input input collection + * @param record type + * @return global toplist + */ + public static PTable globalToplist(PCollection input) { + return negateCounts(negateCounts(input.count()).groupByKey(1).ungroup()); + } + + /** + * When creating toplists, it is often required to sort by count descending. As some sort operations don't support + * order (such as SecondarySort), this method will negate counts so that a natural-ordered sort will produce a + * descending order. + * @param table PTable to process + * @param key type + * @return PTable of the same format with the value negated + */ + public static PTable negateCounts(PTable table) { + return table.parallelDo(new MapFn, Pair>() { + @Override + public Pair map(Pair input) { + return Pair.of(input.first(), -input.second()); + } + }, table.getPTableType()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java new file mode 100644 index 0000000..6fadd35 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.ImmutableMap; +import org.apache.crunch.PTable; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import java.util.Map; + +import static org.apache.crunch.types.avro.Avros.*; +import static org.junit.Assert.assertEquals; + +public class AverageTest { + @Test + public void testMeanValue() { + PTable testTable = MemPipeline.typedTableOf( + tableOf(strings(), ints()), + "a", 2, + "a", 10, + "b", 3, + "c", 3, + "c", 4, + "c", 5); + Map actual = Average.meanValue(testTable).materializeToMap(); + Map expected = ImmutableMap.of( + "a", 6.0, + "b", 3.0, + "c", 4.0 + ); + + assertEquals(expected, actual); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java new file mode 100644 index 0000000..3e70882 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.base.Strings; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import org.apache.avro.util.Utf8; +import org.apache.crunch.DoFn; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.emit.InMemoryEmitter; +import org.apache.crunch.test.Employee; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.util.Collection; +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; + +public class DoFnsTest { + + private static class AvroIterable implements Iterable { + @Override + public Iterator iterator() { + final Employee rec = new Employee(new Utf8("something"), 10000, new Utf8("")); + return new AbstractIterator() { + private int n = 0; + @Override + protected Employee computeNext() { + n++; + if (n > 3) return endOfData(); + rec.setDepartment(new Utf8(Strings.repeat("*", n))); + return rec; + } + }; + } + } + + private static class CollectingMapFn extends + MapFn>, Collection> { + + @Override + public Collection map(Pair> input) { + return Lists.newArrayList(input.second()); + } + } + + @Test + public void testDetach() { + Collection expected = Lists.newArrayList( + new Employee(new Utf8("something"), 10000, new Utf8("*")), + new Employee(new Utf8("something"), 10000, new Utf8("**")), + new Employee(new Utf8("something"), 10000, new Utf8("***")) + ); + DoFn>, Collection> doFn = + DoFns.detach(new CollectingMapFn(), Avros.specifics(Employee.class)); + Pair> input = Pair.of("key", (Iterable) new AvroIterable()); + InMemoryEmitter> emitter = new InMemoryEmitter>(); + + doFn.configure(new Configuration()); + doFn.initialize(); + doFn.process(input, emitter); + doFn.cleanup(emitter); + + assertEquals(expected, emitter.getOutput().get(0)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java new file mode 100644 index 0000000..a53a33c --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.ImmutableMap; +import org.apache.crunch.PTable; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import java.util.Map; + +import static org.apache.crunch.types.avro.Avros.longs; +import static org.apache.crunch.types.avro.Avros.strings; +import static org.apache.crunch.types.avro.Avros.tableOf; +import static org.junit.Assert.assertEquals; + +public class PTablesTest { + @Test + public void testSwapKeyValue() { + PTable table = MemPipeline.typedTableOf(tableOf(strings(), longs()), "hello", 14L, "goodbye", 21L); + PTable actual = PTables.swapKeyValue(table); + Map expected = ImmutableMap.of(14L, "hello", 21L, "goodbye"); + assertEquals(expected, actual.materializeToMap()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java new file mode 100644 index 0000000..ea2a312 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.lib.Quantiles.Result; + +import org.junit.Test; + +import java.util.Map; + +import static org.apache.crunch.types.avro.Avros.*; +import static org.junit.Assert.assertEquals; + +public class QuantilesTest { + + private static Quantiles.Result result(long count, Pair... quantiles) { + return new Quantiles.Result(count, Lists.newArrayList(quantiles)); + } + + @Test + public void testQuantilesExact() { + PTable testTable = MemPipeline.typedTableOf( + tableOf(strings(), ints()), + "a", 5, + "a", 2, + "a", 3, + "a", 4, + "a", 1); + Map> actualS = Quantiles.distributed(testTable, 0, 0.5, 1.0).materializeToMap(); + Map> actualM = Quantiles.inMemory(testTable, 0, 0.5, 1.0).materializeToMap(); + Map> expected = ImmutableMap.of( + "a", result(5, Pair.of(0.0, 1), Pair.of(0.5, 3), Pair.of(1.0, 5)) + ); + + assertEquals(expected, actualS); + assertEquals(expected, actualM); + } + + @Test + public void testQuantilesBetween() { + PTable testTable = MemPipeline.typedTableOf( + tableOf(strings(), ints()), + "a", 5, + "a", 2, // We expect the 0.5 to correspond to this element, according to the "nearest rank" %ile definition. + "a", 4, + "a", 1); + Map> actualS = Quantiles.distributed(testTable, 0.5).materializeToMap(); + Map> actualM = Quantiles.inMemory(testTable, 0.5).materializeToMap(); + Map> expected = ImmutableMap.of( + "a", result(4, Pair.of(0.5, 2)) + ); + + assertEquals(expected, actualS); + assertEquals(expected, actualM); + } + + @Test + public void testQuantilesNines() { + PTable testTable = MemPipeline.typedTableOf( + tableOf(strings(), ints()), + "a", 10, + "a", 20, + "a", 30, + "a", 40, + "a", 50, + "a", 60, + "a", 70, + "a", 80, + "a", 90, + "a", 100); + Map> actualS = Quantiles.distributed(testTable, 0.9, 0.99).materializeToMap(); + Map> actualM = Quantiles.inMemory(testTable, 0.9, 0.99).materializeToMap(); + Map> expected = ImmutableMap.of( + "a", result(10, Pair.of(0.9, 90), Pair.of(0.99, 100)) + ); + + assertEquals(expected, actualS); + assertEquals(expected, actualM); + } + + @Test + public void testQuantilesLessThanOrEqual() { + PTable testTable = MemPipeline.typedTableOf( + tableOf(strings(), ints()), + "a", 10, + "a", 20, + "a", 30, + "a", 40, + "a", 50, + "a", 60, + "a", 70, + "a", 80, + "a", 90, + "a", 100); + Map> actualS = Quantiles.distributed(testTable, 0.5).materializeToMap(); + Map> actualM = Quantiles.inMemory(testTable, 0.5).materializeToMap(); + Map> expected = ImmutableMap.of( + "a", result(10, Pair.of(0.5, 50)) + ); + + assertEquals(expected, actualS); + assertEquals(expected, actualM); + } + + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java new file mode 100644 index 0000000..3d0cdbd --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; + +import static org.apache.crunch.types.avro.Avros.strings; +import static org.apache.crunch.types.avro.Avros.tableOf; +import static org.junit.Assert.assertEquals; + +public class TopListTest { + + private Collection collectionOf(T... elements) { + return Lists.newArrayList(elements); + } + + @Test + public void testTopNYbyX() { + PTable data = MemPipeline.typedTableOf(tableOf(strings(), strings()), + "a", "x", + "a", "x", + "a", "x", + "a", "y", + "a", "y", + "a", "z", + "b", "x", + "b", "x", + "b", "z"); + Map>> actual = TopList.topNYbyX(data, 2).materializeToMap(); + Map>> expected = ImmutableMap.of( + "a", collectionOf(Pair.of(3L, "x"), Pair.of(2L, "y")), + "b", collectionOf(Pair.of(2L, "x"), Pair.of(1L, "z"))); + + assertEquals(expected, actual); + } + + @Test + public void testGlobalToplist() { + PCollection data = MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b", "b", "c", "c", "c", "c"); + Map actual = TopList.globalToplist(data).materializeToMap(); + Map expected = ImmutableMap.of("c", 4L, "a", 3L, "b", 2L); + assertEquals(expected, actual); + } +} \ No newline at end of file