crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-447: Secondary sort and shard for Scrunch
Date Thu, 17 Jul 2014 20:03:19 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 0e7e0d0b0 -> 3f13ee65c


CRUNCH-447: Secondary sort and shard for Scrunch


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3f13ee65
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3f13ee65
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3f13ee65

Branch: refs/heads/master
Commit: 3f13ee65c9debcf6bd7366607f58beae6c73ffe2
Parents: 0e7e0d0
Author: Josh Wills <jwills@apache.org>
Authored: Wed Jul 16 15:27:04 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Jul 17 12:26:41 2014 -0700

----------------------------------------------------------------------
 .../src/it/resources/secondary_sort_input.txt   |  7 ---
 .../crunch/scrunch/SecondarySortTest.scala      | 50 ++++++++++++++++
 .../apache/crunch/scrunch/PCollectionLike.scala |  9 +++
 .../org/apache/crunch/scrunch/PTable.scala      | 63 +++++++++++++++++++-
 .../src/main/resources/secondary_sort_input.txt |  7 +++
 5 files changed, 127 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3f13ee65/crunch-core/src/it/resources/secondary_sort_input.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/secondary_sort_input.txt b/crunch-core/src/it/resources/secondary_sort_input.txt
deleted file mode 100644
index 3c7be93..0000000
--- a/crunch-core/src/it/resources/secondary_sort_input.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-one,1,1 
-one,2,-3 
-two,4,5 
-two,2,6 
-two,1,7,9 
-three,0,-1 
-one,-5,10 

http://git-wip-us.apache.org/repos/asf/crunch/blob/3f13ee65/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala
b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala
new file mode 100644
index 0000000..1ae1e9c
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala
@@ -0,0 +1,50 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.junit.Assert._
+import org.junit.Test
+
+class SecondarySortTest extends CrunchSuite with Serializable {
+
+  @Test def testSecondarySortAvros {
+    runSecondarySort(Avros)
+  }
+
+  @Test def testSecondarySortWritables {
+    runSecondarySort(Writables)
+  }
+
+  def runSecondarySort(ptf: PTypeFamily) {
+    val p = Pipeline.mapReduce(classOf[SecondarySortTest], tempDir.getDefaultConfiguration)
+    val inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt")
+    val lines = p.read(From.textFile(inputFile, ptf.strings)).map(input => {
+      val pieces = input.split(",")
+      (pieces(0), (pieces(1).trim.toInt, pieces(2).trim.toInt))
+    })
+    .secondarySortAndMap((k, iter: Iterable[(Int, Int)]) => {
+      List(k, iter.mkString(",")).mkString(",")
+    })
+    .materialize
+    assertEquals(List("one,(-5,10),(1,1),(2,-3)", "three,(0,-1)", "two,(1,7),(2,6),(4,5)"),
lines.toList)
+    p.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3f13ee65/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index 3d35bc6..b2216f1 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -20,6 +20,7 @@ package org.apache.crunch.scrunch
 import org.apache.crunch.{PCollection => JCollection, Pair => JPair, _}
 import org.apache.crunch.types.{PType, PTableType}
 import org.apache.crunch.types.writable.WritableTypeFamily
+import org.apache.crunch.lib.Shard
 
 /**
  * Base trait for PCollection-like entities in Scrunch, including PTables and PGroupedTables.
@@ -269,6 +270,14 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]]
{
   }
 
   /**
+   * Re-partitions this instance into the given number of partitions.
+   *
+   * @param numPartitions the number of partitions to use
+   * @return a re-partitioned version of the data in this instance
+   */
+  def shard(numPartitions: Int) = wrap(Shard.shard(native, numPartitions))
+
+  /**
    * Gets the number of elements represented by this PCollection.
    *
    * @return The number of elements in this PCollection.

http://git-wip-us.apache.org/repos/asf/crunch/blob/3f13ee65/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 50ddc5c..eaba071 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -17,13 +17,14 @@
  */
 package org.apache.crunch.scrunch
 
+import java.lang.{Iterable => JIterable}
 import java.util.{Collection => JCollect}
 
 import scala.collection.JavaConversions._
 
 import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair =>
CPair, _}
-import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables}
-import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType}
+import org.apache.crunch.lib._
+import org.apache.crunch.lib.join.{JoinUtils, DefaultJoinStrategy, JoinType}
 import org.apache.crunch.types.{PTableType, PType}
 import scala.collection.Iterable
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
@@ -54,6 +55,30 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
     filter((k, v) => pf.isDefinedAt((k, v))).map((k, v) => pf((k, v)))(pt, b)
   }
 
+  def secondarySortAndMap[K2, VX, T, To](f: (K, Iterable[(K2, VX)]) => T, numReducers:
Int = -1)
+    (implicit ev: <:<[V, (K2, VX)], pt: PTypeH[T], b: CanParallelTransform[T, To])
= {
+    b(prepareSecondarySort(numReducers), secSortMap(f), pt.get(getTypeFamily()))
+  }
+
+  def secondarySortAndFlatMap[K2, VX, T, To](f: (K, Iterable[(K2, VX)]) => TraversableOnce[T],
numReducers: Int = -1)
+    (implicit ev: <:<[V, (K2, VX)], pt: PTypeH[T], b: CanParallelTransform[T, To])
= {
+    b(prepareSecondarySort(numReducers), secSortFlatMap(f), pt.get(getTypeFamily()))
+  }
+
+  private def prepareSecondarySort[K2, VX](numReducers: Int)(implicit ev: <:<[V, (K2,
VX)]): PGroupedTable[CPair[K, K2], (K2, VX)] = {
+    val basePTF = getTypeFamily().ptf
+    val gopts = GroupingOptions.builder()
+      .requireSortedKeys()
+      .groupingComparatorClass(JoinUtils.getGroupingComparator(basePTF))
+      .partitionerClass(JoinUtils.getPartitionerClass(basePTF))
+    if (numReducers > 0) {
+      gopts.numReducers(numReducers)
+    }
+    val kt = basePTF.pairs(keyType(), valueType().getSubTypes.get(0).asInstanceOf[PType[K2]])
+    val ptt = basePTF.tableOf(kt, valueType.asInstanceOf[PType[(K2, VX)]])
+    parallelDo(new SPrepareSSFn[K, V, K2, VX], ptt).groupByKey(gopts.build())
+  }
+
   def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
     val ptf = getTypeFamily()
     val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf))
@@ -262,6 +287,30 @@ trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]]
with Func
   override def map(input: CPair[K, V]) = CPair.of(apply(input.first()), input.second())
 }
 
+private class SPrepareSSFn[K, V, K2, VX] extends MapFn[CPair[K, V], CPair[CPair[K, K2], (K2,
VX)]] {
+  override def map(input: CPair[K, V]): CPair[CPair[K, K2], (K2, VX)] = {
+    val sec = input.second().asInstanceOf[(K2, VX)]
+    CPair.of(CPair.of(input.first(), sec._1), sec)
+  }
+}
+
+trait SecSortFlatMapFn[K, K2, VX, T] extends DoFn[CPair[CPair[K, K2], JIterable[(K2, VX)]],
T]
+  with Function2[K, Iterable[(K2, VX)], TraversableOnce[T]] {
+  override def process(input: CPair[CPair[K, K2], JIterable[(K2, VX)]], emitter: Emitter[T])
{
+    val iter = iterableAsScalaIterable(input.second())
+    for (v <- apply(input.first().first(), iter)) {
+      emitter.emit(v)
+    }
+  }
+}
+
+trait SecSortMapFn[K, K2, VX, T] extends MapFn[CPair[CPair[K, K2], JIterable[(K2, VX)]],
T]
+  with Function2[K, Iterable[(K2, VX)], T] {
+  override def map(input: CPair[CPair[K, K2], JIterable[(K2, VX)]]) = {
+    apply(input.first().first(), iterableAsScalaIterable(input.second()))
+  }
+}
+
 object PTable {
   type TIOC = TaskInputOutputContext[_, _, _, _]
 
@@ -316,4 +365,14 @@ object PTable {
   def incValueFn[K, V, T](fn: V => T) = new Function1[CPair[K, V], T] with Serializable
{
     def apply(p: CPair[K, V]): T = fn(p.second())
   }
+
+  def secSortFlatMap[K, K2, VX, T](fn: (K, Iterable[(K2, VX)]) => TraversableOnce[T])
= {
+    new SecSortFlatMapFn[K, K2, VX, T] {
+      def apply(k: K, v: Iterable[(K2, VX)]) = fn(k, v)
+    }
+  }
+
+  def secSortMap[K, K2, VX, T](fn: (K, Iterable[(K2, VX)]) => T) = new SecSortMapFn[K,
K2, VX, T] {
+    def apply(k: K, v: Iterable[(K2, VX)]) = fn(k, v)
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/3f13ee65/crunch-test/src/main/resources/secondary_sort_input.txt
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/secondary_sort_input.txt b/crunch-test/src/main/resources/secondary_sort_input.txt
new file mode 100644
index 0000000..3c7be93
--- /dev/null
+++ b/crunch-test/src/main/resources/secondary_sort_input.txt
@@ -0,0 +1,7 @@
+one,1,1 
+one,2,-3 
+two,4,5 
+two,2,6 
+two,1,7,9 
+three,0,-1 
+one,-5,10 


Mime
View raw message