crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-238: Add numReducers options to the SecondarySort lib
Date Thu, 18 Jul 2013 01:20:12 GMT
Updated Branches:
  refs/heads/master a3dd33f45 -> 643e41063


CRUNCH-238: Add numReducers options to the SecondarySort lib


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/643e4106
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/643e4106
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/643e4106

Branch: refs/heads/master
Commit: 643e4106374c95b2f17c63da3800dbdde991e341
Parents: a3dd33f
Author: Josh Wills <jwills@apache.org>
Authored: Wed Jul 17 18:17:08 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Wed Jul 17 18:17:08 2013 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/lib/SecondarySort.java    | 58 +++++++++++++++-----
 1 file changed, 45 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/643e4106/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
index 54b4396..32bff38 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -47,24 +47,54 @@ public class SecondarySort {
    * Perform a secondary sort on the given {@code PTable} instance and then apply a
    * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}.
    */
-  public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1,
V2>> input,
-      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T>
ptype) {
-    return prepare(input)
-        .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
+  public static <K, V1, V2, T> PCollection<T> sortAndApply(
+      PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn,
+      PType<T> ptype) {
+    return sortAndApply(input, doFn, ptype, -1);
   }
   
   /**
    * Perform a secondary sort on the given {@code PTable} instance and then apply a
+   * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>},
using
+   * the given number of reducers.
+   */
+  public static <K, V1, V2, T> PCollection<T> sortAndApply(
+      PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn,
+      PType<T> ptype,
+      int numReducers) {
+    return prepare(input, numReducers)
+        .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
+  }
+ 
+  /**
+   * Perform a secondary sort on the given {@code PTable} instance and then apply a
    * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}.
    */
-  public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1,
V2>> input,
-      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn,
PTableType<U, V> ptype) {
-    return prepare(input)
+  public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(
+      PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn,
+      PTableType<U, V> ptype) {
+    return sortAndApply(input, doFn, ptype, -1);
+  }
+  
+  /**
+   * Perform a secondary sort on the given {@code PTable} instance and then apply a
+   * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>},
using
+   * the given number of reducers.
+   */
+  public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(
+      PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn,
+      PTableType<U, V> ptype,
+      int numReducers) {
+    return prepare(input, numReducers)
         .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn),
ptype);
   }
   
   private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>>
prepare(
-      PTable<K, Pair<V1, V2>> input) {
+      PTable<K, Pair<V1, V2>> input, int numReducers) {
     PTypeFamily ptf = input.getTypeFamily();
     PType<Pair<V1, V2>> valueType = input.getValueType();
     PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
@@ -72,12 +102,14 @@ public class SecondarySort {
         valueType);
     PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
         ptf.collections(input.getValueType()));
+    GroupingOptions.Builder gob = GroupingOptions.builder()
+        .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
+        .partitionerClass(JoinUtils.getPartitionerClass(ptf));
+    if (numReducers > 0) {
+      gob.numReducers(numReducers);
+    }
     return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
-        .groupByKey(
-            GroupingOptions.builder()
-            .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
-            .partitionerClass(JoinUtils.getPartitionerClass(ptf))
-            .build());
+        .groupByKey(gob.build());
   }
   
   private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1,
V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {


Mime
View raw message