crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-432: Create JoinStrategy APIs for Scrunch
Date Mon, 14 Jul 2014 14:31:58 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 a32e61cc9 -> 6e52d7fc8


CRUNCH-432: Create JoinStrategy APIs for Scrunch


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e52d7fc
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e52d7fc
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e52d7fc

Branch: refs/heads/apache-crunch-0.8
Commit: 6e52d7fc86f074ccaa3e62efdd919fc26233a08a
Parents: a32e61c
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jul 3 14:34:00 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jul 14 07:25:46 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/fn/SwapFn.java  |  45 +++++++
 .../org/apache/crunch/scrunch/JoinTest.scala    |  15 ++-
 .../scala/org/apache/crunch/scrunch/Joins.scala | 128 +++++++++++++++++++
 .../org/apache/crunch/scrunch/PCollection.scala |   1 -
 .../apache/crunch/scrunch/PCollectionLike.scala |  16 +++
 .../org/apache/crunch/scrunch/PTable.scala      |  26 +++-
 6 files changed, 228 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java
new file mode 100644
index 0000000..5d3c3d9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/SwapFn.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+/**
+ * Swap the elements of a {@code Pair} type.
+ */
+public class SwapFn<V1, V2> extends MapFn<Pair<V1, V2>, Pair<V2, V1>>
{
+
+  public static <V1, V2> PType<Pair<V2, V1>> ptype(PType<Pair<V1,
V2>> pt) {
+    return pt.getFamily().pairs(pt.getSubTypes().get(1), pt.getSubTypes().get(0));
+  }
+
+  public static <K, V> PTableType<V, K> tableType(PTableType<K, V> ptt)
{
+    return ptt.getFamily().tableOf(ptt.getValueType(), ptt.getKeyType());
+  }
+
+  @Override
+  public Pair<V2, V1> map(Pair<V1, V2> input) {
+    if (input == null) {
+      return null;
+    }
+    return Pair.of(input.second(), input.first());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
index 57f974d..eb3b677 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
@@ -20,9 +20,10 @@ package org.apache.crunch.scrunch
 import org.apache.crunch.io.{From => from, To => to}
 
 import _root_.org.junit.Test
+import org.apache.crunch.lib.join.JoinType
 
 class JoinTest extends CrunchSuite {
-  lazy val pipeline = Pipeline.mapReduce[CogroupTest](tempDir.getDefaultConfiguration)
+  lazy val pipeline = Pipeline.mapReduce[JoinTest](tempDir.getDefaultConfiguration)
 
   def wordCount(fileName: String) = {
     pipeline.read(from.textFile(fileName))
@@ -40,4 +41,16 @@ class JoinTest extends CrunchSuite {
     assert(filtered.exists(_ == ("macbeth", 66)))
     pipeline.done
   }
+
+  @Test def joinMapside {
+    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
+    val maugham = tempDir.copyResourceFileName("maugham.txt")
+    val output = tempDir.getFile("output")
+    val filtered = wordCount(shakespeare).innerJoinUsing(wordCount(maugham), Joins.mapside())
+      .map((k, v) => (k, v._1 - v._2))
+      .write(to.textFile(output.getAbsolutePath()))
+      .filter((k, d) => d > 0).materialize
+    assert(filtered.exists(_ == ("macbeth", 66)))
+    pipeline.done
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala
new file mode 100644
index 0000000..c6b7d62
--- /dev/null
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Joins.scala
@@ -0,0 +1,128 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.{Pair => P}
+import org.apache.crunch.lib.join._
+import org.apache.crunch.lib.join.ShardedJoinStrategy.ShardingStrategy
+import org.apache.crunch
+import org.apache.crunch.fn.SwapFn
+
+/**
+ * An adapter for the JoinStrategy interface that works with Scrunch PTables.
+ */
+class ScrunchJoinStrategy[K, U, V](val delegate: JoinStrategy[K, U, V]) {
+  def join(left: PTable[K, U], right: PTable[K, V], joinType: JoinType) = {
+    val jres = delegate.join(left.native, right.native, joinType)
+    val ptf = left.getTypeFamily()
+    val ptype = ptf.tableOf(left.keyType(), ptf.tuple2(left.valueType(), right.valueType()))
+    val inter = new PTable[K, P[U, V]](jres)
+    inter.parallelDo(new SMapTableValuesFn[K, P[U, V], (U, V)] {
+      def apply(x: P[U, V]) = (x.first(), x.second())
+    }, ptype)
+  }
+}
+
+private class ReverseJoinStrategy[K, U, V](val delegate: JoinStrategy[K, V, U]) extends JoinStrategy[K,
U, V] {
+  override def join(left: crunch.PTable[K, U], right: crunch.PTable[K, V], joinType: JoinType)
= {
+    val res: crunch.PTable[K, P[V, U]] =
+      if (joinType == JoinType.LEFT_OUTER_JOIN) {
+        delegate.join(right, left, JoinType.RIGHT_OUTER_JOIN)
+      } else if (joinType == JoinType.RIGHT_OUTER_JOIN) {
+        delegate.join(right, left, JoinType.LEFT_OUTER_JOIN)
+      } else {
+        delegate.join(right, left, joinType)
+      }
+    res.mapValues(new SwapFn[V, U](), SwapFn.ptype(res.getValueType))
+  }
+}
+
+object ScrunchJoinStrategy {
+  def apply[K, U, V](delegate: JoinStrategy[K, U, V]) = new ScrunchJoinStrategy[K, U, V](delegate)
+}
+
+class ScalaShardingStrategy[K](val f: K => Int) extends ShardingStrategy[K] {
+  override def getNumShards(key: K): Int = f(key)
+}
+
+/**
+ * Factory methods for the common JoinStrategy implementations in Crunch adapted to work
with Scrunch
+ * PTables.
+ */
+object Joins {
+
+  /**
+   * Strategy that implements a standard reduce-side join.
+   *
+   * @param numReducers the number of partitions to use to perform the shuffle
+   */
+  def default[K, U, V](numReducers: Int = -1) = {
+    ScrunchJoinStrategy(new DefaultJoinStrategy[K, U, V](numReducers))
+  }
+
+  /**
+   * Strategy for performing a mapside-join in which the left-side PTable will be optionally
+   * materialized to disk and then loaded into memory while we stream over the contents right-side
+   * PTable.
+   *
+   * @param materialize Whether to materialize the left side table before loading it into
+   *                    memory, vs. performing any transformations it requires in-memory
on the cluster
+   */
+  def mapside[K, U, V](materialize: Boolean = true) = {
+    ScrunchJoinStrategy(new ReverseJoinStrategy[K, U, V](new MapsideJoinStrategy[K, V, U](materialize)))
+  }
+
+  /**
+   * Join strategy that uses a bloom filter over the keys in the left hand PTable
+   * @param numElements the expected number of unique keys
+   * @param falsePositiveRate the acceptable false positive rate for the filter
+   * @param delegate the underlying join strategy to use once the bloom filter is created
+   */
+  def bloom[K, U, V](numElements: Int, falsePositiveRate: Float = 0.05f,
+                     delegate: ScrunchJoinStrategy[K, U, V] = default()) = {
+    ScrunchJoinStrategy(new BloomFilterJoinStrategy[K, U, V](numElements, falsePositiveRate,
delegate.delegate))
+  }
+
+  /**
+   * Join strategy that shards the value associated with a single key across multiple reducers.
+   * Useful when one of the PTables has a skewed key distribution, where lots of values are
+   * associated with a small number of keys.
+   * @param numShards number of shards to use with the default sharding strategy
+   */
+  def sharded[K, U, V](numShards: Int): ScrunchJoinStrategy[K, U, V] = {
+    ScrunchJoinStrategy(new ShardedJoinStrategy[K, U, V](numShards))
+  }
+
+  /**
+   * Sharded join that uses the given Scala function to determine the shard for each key.
+   */
+  def sharded[K, U, V](f: K => Int): ScrunchJoinStrategy[K, U, V] = {
+    sharded[K, U, V](new ScalaShardingStrategy[K](f))
+  }
+
+  /**
+   * Sharded join with a custom sharding strategy.
+   * @param strategy the strategy to use
+   */
+  def sharded[K, U, V](strategy: ShardingStrategy[K]): ScrunchJoinStrategy[K, U, V] = {
+    ScrunchJoinStrategy(new ShardedJoinStrategy[K, U, V](strategy))
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index ed39d30..758620b 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions
 import org.apache.crunch.{PCollection => JCollection, Pair => CPair, _}
 import org.apache.crunch.lib.{Aggregate, Cartesian, Sample}
 import org.apache.crunch.scrunch.Conversions._
-import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
 import org.apache.crunch.types.PType
 import org.apache.crunch.fn.IdentityFn

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index 35b90be..3d35bc6 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -65,6 +65,22 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
   }
 
   /**
+   * Cache the data in this instance using the default caching mechanism for the
+   * underlying Pipeline type.
+   *
+   * @return this instance
+   */
+  def cache() = wrap(native.cache())
+
+  /**
+   * Cache the data in this instance using the given caching options, if they are
+   * applicable for the underlying Pipeline type.
+   *
+   * @return this instance
+   */
+  def cache(opts: CachingOptions) = wrap(native.cache(opts))
+
+  /**
    * Apply a flatMap operation to this instance, returning a {@code PTable} if the return
    * type of the function is a {@code TraversableOnce[Tuple2]} and a {@code PCollection}
otherwise.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e52d7fc/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 6fab61a..50ddc5c 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
 import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair =>
CPair, _}
 import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables}
 import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType}
-import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 import org.apache.crunch.types.{PTableType, PType}
 import scala.collection.Iterable
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
@@ -127,6 +126,31 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
     join[V2](other, JoinType.FULL_OUTER_JOIN, parallelism)
   }
 
+  def joinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2],
+                    joinType: JoinType): PTable[K, (V, V2)] = {
+     strategy.join(this, other, joinType)
+  }
+
+  def joinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]): PTable[K,
(V, V2)] = {
+    innerJoinUsing[V2](other, strategy)
+  }
+
+  def innerJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) =
{
+    joinUsing[V2](other, strategy, JoinType.INNER_JOIN)
+  }
+
+  def leftJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) =
{
+    joinUsing[V2](other, strategy, JoinType.LEFT_OUTER_JOIN)
+  }
+
+  def rightJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) =
{
+    joinUsing[V2](other, strategy, JoinType.RIGHT_OUTER_JOIN)
+  }
+
+  def fullJoinUsing[V2](other: PTable[K, V2], strategy: ScrunchJoinStrategy[K, V, V2]) =
{
+    joinUsing[V2](other, strategy, JoinType.FULL_OUTER_JOIN)
+  }
+
   def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {
     val ptf = getTypeFamily()
     val inter = new PTable(Cartesian.cross(this.native, other.native))


Mime
View raw message