Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9FBB81148B for ; Mon, 14 Jul 2014 14:31:59 +0000 (UTC) Received: (qmail 27441 invoked by uid 500); 14 Jul 2014 14:31:59 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 27407 invoked by uid 500); 14 Jul 2014 14:31:59 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 27393 invoked by uid 99); 14 Jul 2014 14:31:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Jul 2014 14:31:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 048258BDB5F; Mon, 14 Jul 2014 14:31:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-432: Create JoinStrategy APIs for Scrunch Date: Mon, 14 Jul 2014 14:31:58 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 a32e61cc9 -> 6e52d7fc8 CRUNCH-432: Create JoinStrategy APIs for Scrunch Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e52d7fc Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e52d7fc Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e52d7fc Branch: refs/heads/apache-crunch-0.8 Commit: 6e52d7fc86f074ccaa3e62efdd919fc26233a08a Parents: a32e61c Author: Josh Wills Authored: Thu Jul 3 14:34:00 2014 -0700 Committer: Josh Wills Committed: Mon Jul 14 07:25:46 2014 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/fn/SwapFn.java | 45 +++++++ .../org/apache/crunch/scrunch/JoinTest.scala | 15 ++- .../scala/org/apache/crunch/scrunch/Joins.scala | 128 +++++++++++++++++++ .../org/apache/crunch/scrunch/PCollection.scala | 1 - .../apache/crunch/scrunch/PCollectionLike.scala | 16 +++ .../org/apache/crunch/scrunch/PTable.scala | 26 +++- 6 files changed, 228 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java new file mode 100644 index 0000000..5d3c3d9 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +/** + * Swap the elements of a {@code Pair} type. + */ +public class SwapFn extends MapFn, Pair> { + + public static PType> ptype(PType> pt) { + return pt.getFamily().pairs(pt.getSubTypes().get(1), pt.getSubTypes().get(0)); + } + + public static PTableType tableType(PTableType ptt) { + return ptt.getFamily().tableOf(ptt.getValueType(), ptt.getKeyType()); + } + + @Override + public Pair map(Pair input) { + if (input == null) { + return null; + } + return Pair.of(input.second(), input.first()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala index 57f974d..eb3b677 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala @@ -20,9 +20,10 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from, To => to} import _root_.org.junit.Test +import org.apache.crunch.lib.join.JoinType class JoinTest extends CrunchSuite { - lazy val pipeline = Pipeline.mapReduce[CogroupTest](tempDir.getDefaultConfiguration) + lazy val pipeline = Pipeline.mapReduce[JoinTest](tempDir.getDefaultConfiguration) def wordCount(fileName: String) = { pipeline.read(from.textFile(fileName)) @@ -40,4 +41,16 @@ class JoinTest extends CrunchSuite { assert(filtered.exists(_ == ("macbeth", 66))) pipeline.done } + + @Test def joinMapside { + val shakespeare = tempDir.copyResourceFileName("shakes.txt") + val maugham = tempDir.copyResourceFileName("maugham.txt") + val output = tempDir.getFile("output") + val filtered = wordCount(shakespeare).innerJoinUsing(wordCount(maugham), Joins.mapside()) + .map((k, v) => (k, v._1 - v._2)) + .write(to.textFile(output.getAbsolutePath())) + .filter((k, d) => d > 0).materialize + assert(filtered.exists(_ == ("macbeth", 66))) + pipeline.done + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala new file mode 100644 index 0000000..c6b7d62 --- /dev/null +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala @@ -0,0 +1,128 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import org.apache.crunch.{Pair => P} +import org.apache.crunch.lib.join._ +import org.apache.crunch.lib.join.ShardedJoinStrategy.ShardingStrategy +import org.apache.crunch +import org.apache.crunch.fn.SwapFn + +/** + * An adapter for the JoinStrategy interface that works with Scrunch PTables. + */ +class ScrunchJoinStrategy[K, U, V](val delegate: JoinStrategy[K, U, V]) { + def join(left: PTable[K, U], right: PTable[K, V], joinType: JoinType) = { + val jres = delegate.join(left.native, right.native, joinType) + val ptf = left.getTypeFamily() + val ptype = ptf.tableOf(left.keyType(), ptf.tuple2(left.valueType(), right.valueType())) + val inter = new PTable[K, P[U, V]](jres) + inter.parallelDo(new SMapTableValuesFn[K, P[U, V], (U, V)] { + def apply(x: P[U, V]) = (x.first(), x.second()) + }, ptype) + } +} + +private class ReverseJoinStrategy[K, U, V](val delegate: JoinStrategy[K, V, U]) extends JoinStrategy[K, U, V] { + override def join(left: crunch.PTable[K, U], right: crunch.PTable[K, V], joinType: JoinType) = { + val res: crunch.PTable[K, P[V, U]] = + if (joinType == JoinType.LEFT_OUTER_JOIN) { + delegate.join(right, left, JoinType.RIGHT_OUTER_JOIN) + } else if (joinType == JoinType.RIGHT_OUTER_JOIN) { + delegate.join(right, left, JoinType.LEFT_OUTER_JOIN) + } else { + delegate.join(right, left, joinType) + } + res.mapValues(new SwapFn[V, U](), SwapFn.ptype(res.getValueType)) + } +} + +object ScrunchJoinStrategy { + def apply[K, U, V](delegate: JoinStrategy[K, U, V]) = new ScrunchJoinStrategy[K, U, V](delegate) +} + +class ScalaShardingStrategy[K](val f: K => Int) extends ShardingStrategy[K] { + override def getNumShards(key: K): Int = f(key) +} + +/** + * Factory methods for the common JoinStrategy implementations in Crunch adapted to work with Scrunch + * PTables. + */ +object Joins { + + /** + * Strategy that implements a standard reduce-side join. + * + * @param numReducers the number of partitions to use to perform the shuffle + */ + def default[K, U, V](numReducers: Int = -1) = { + ScrunchJoinStrategy(new DefaultJoinStrategy[K, U, V](numReducers)) + } + + /** + * Strategy for performing a mapside-join in which the left-side PTable will be optionally + * materialized to disk and then loaded into memory while we stream over the contents right-side + * PTable. + * + * @param materialize Whether to materialize the left side table before loading it into + * memory, vs. performing any transformations it requires in-memory on the cluster + */ + def mapside[K, U, V](materialize: Boolean = true) = { + ScrunchJoinStrategy(new ReverseJoinStrategy[K, U, V](new MapsideJoinStrategy[K, V, U](materialize))) + } + + /** + * Join strategy that uses a bloom filter over the keys in the left hand PTable + * @param numElements the expected number of unique keys + * @param falsePositiveRate the acceptable false positive rate for the filter + * @param delegate the underlying join strategy to use once the bloom filter is created + */ + def bloom[K, U, V](numElements: Int, falsePositiveRate: Float = 0.05f, + delegate: ScrunchJoinStrategy[K, U, V] = default()) = { + ScrunchJoinStrategy(new BloomFilterJoinStrategy[K, U, V](numElements, falsePositiveRate, delegate.delegate)) + } + + /** + * Join strategy that shards the value associated with a single key across multiple reducers. + * Useful when one of the PTables has a skewed key distribution, where lots of values are + * associated with a small number of keys. + * @param numShards number of shards to use with the default sharding strategy + */ + def sharded[K, U, V](numShards: Int): ScrunchJoinStrategy[K, U, V] = { + ScrunchJoinStrategy(new ShardedJoinStrategy[K, U, V](numShards)) + } + + /** + * Sharded join that uses the given Scala function to determine the shard for each key. + */ + def sharded[K, U, V](f: K => Int): ScrunchJoinStrategy[K, U, V] = { + sharded[K, U, V](new ScalaShardingStrategy[K](f)) + } + + /** + * Sharded join with a custom sharding strategy. + * @param strategy the strategy to use + */ + def sharded[K, U, V](strategy: ShardingStrategy[K]): ScrunchJoinStrategy[K, U, V] = { + ScrunchJoinStrategy(new ShardedJoinStrategy[K, U, V](strategy)) + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index ed39d30..758620b 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConversions import org.apache.crunch.{PCollection => JCollection, Pair => CPair, _} import org.apache.crunch.lib.{Aggregate, Cartesian, Sample} import org.apache.crunch.scrunch.Conversions._ -import org.apache.crunch.scrunch.interpreter.InterpreterRunner import org.apache.hadoop.mapreduce.TaskInputOutputContext import org.apache.crunch.types.PType import org.apache.crunch.fn.IdentityFn http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index 35b90be..3d35bc6 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -65,6 +65,22 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { } /** + * Cache the data in this instance using the default caching mechanism for the + * underlying Pipeline type. + * + * @return this instance + */ + def cache() = wrap(native.cache()) + + /** + * Cache the data in this instance using the given caching options, if they are + * applicable for the underlying Pipeline type. + * + * @return this instance + */ + def cache(opts: CachingOptions) = wrap(native.cache(opts)) + + /** * Apply a flatMap operation to this instance, returning a {@code PTable} if the return * type of the function is a {@code TraversableOnce[Tuple2]} and a {@code PCollection} otherwise. */ http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 6fab61a..50ddc5c 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConversions._ import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, _} import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables} import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType} -import org.apache.crunch.scrunch.interpreter.InterpreterRunner import org.apache.crunch.types.{PTableType, PType} import scala.collection.Iterable import org.apache.hadoop.mapreduce.TaskInputOutputContext @@ -127,6 +126,31 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] join[V2](other, JoinType.FULL_OUTER_JOIN, parallelism) } + def joinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2], + joinType: JoinType): PTable[K, (V, V2)] = { + strategy.join(this, other, joinType) + } + + def joinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]): PTable[K, (V, V2)] = { + innerJoinUsing[V2](other, strategy) + } + + def innerJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) = { + joinUsing[V2](other, strategy, JoinType.INNER_JOIN) + } + + def leftJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) = { + joinUsing[V2](other, strategy, JoinType.LEFT_OUTER_JOIN) + } + + def rightJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) = { + joinUsing[V2](other, strategy, JoinType.RIGHT_OUTER_JOIN) + } + + def fullJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) = { + joinUsing[V2](other, strategy, JoinType.FULL_OUTER_JOIN) + } + def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = { val ptf = getTypeFamily() val inter = new PTable(Cartesian.cross(this.native, other.native))