crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-162: Add a Shard library for rebalancing the contents of PCollections
Date Fri, 07 Jun 2013 05:47:05 GMT
Updated Branches:
  refs/heads/master ceaa6a5e0 -> 1d844b3f1


CRUNCH-162: Add a Shard library for rebalancing the contents of PCollections


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1d844b3f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1d844b3f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1d844b3f

Branch: refs/heads/master
Commit: 1d844b3f1f107cc645319080ff3e9e73f0dd72e2
Parents: ceaa6a5
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jun 6 05:49:30 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Jun 6 05:49:30 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Aggregate.java |   22 ++++--
 .../src/main/java/org/apache/crunch/lib/Shard.java |   65 +++++++++++++++
 .../org/apache/crunch/util/PartitionUtils.java     |    4 +
 3 files changed, 84 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index d4109cc..d8388b3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -33,11 +33,11 @@ import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.materialize.pobject.FirstElementPObject;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.util.PartitionUtils;
 
 import com.google.common.collect.Lists;
 
@@ -52,15 +52,24 @@ public class Aggregate {
    * of their occurrences.
    */
   public static <S> PTable<S, Long> count(PCollection<S> collect) {
+    return count(collect, PartitionUtils.getRecommendedPartitions(collect));
+  }
+
+  /**
+   * Returns a {@code PTable} that contains the unique elements of this collection mapped
to a count
+   * of their occurrences.
+   */
+  public static <S> PTable<S, Long> count(PCollection<S> collect, int numPartitions)
{
     PTypeFamily tf = collect.getTypeFamily();
     return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>()
{
       public Pair<S, Long> map(S input) {
         return Pair.of(input, 1L);
       }
-    }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey()
+    }, tf.tableOf(collect.getPType(), tf.longs()))
+        .groupByKey(numPartitions)
         .combineValues(Aggregators.SUM_LONGS());
   }
-
+  
   /**
    * Returns the number of elements in the provided PCollection.
    * 
@@ -252,9 +261,8 @@ public class Aggregate {
   public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K,
V> collect) {
     PTypeFamily tf = collect.getTypeFamily();
     final PType<V> valueType = collect.getValueType();
-    return collect.groupByKey().parallelDo("collect",
-        new MapValuesFn<K, Iterable<V>, Collection<V>>() {
-
+    return collect.groupByKey().mapValues("collect",
+        new MapFn<Iterable<V>, Collection<V>>() {
           @Override
           public void initialize() {
             valueType.initialize(getConfiguration());
@@ -267,6 +275,6 @@ public class Aggregate {
             }
             return collected;
           }
-        }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));
+        }, tf.collections(collect.getValueType()));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
new file mode 100644
index 0000000..07ba0db
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+/**
+ * Utilities for controlling how the data in a {@code PCollection} is balanced across reducers
+ * and output files.
+ */
+public class Shard {
+
+  /**
+   * Creates a {@code PCollection<T>} that has the same contents as its input argument
but will
+   * be written to a fixed number of output files. This is useful for map-only jobs that
process
+   * lots of input files but only write out a small amount of input per task.
+   * 
+   * @param pc The {@code PCollection<T>} to rebalance
+   * @param numPartitions The number of output partitions to create
+   * @return A rebalanced {@code PCollection<T>} with the same contents as the input
+   */
+  public static <T> PCollection<T> shard(PCollection<T> pc, int numPartitions)
{
+    PType<T> pt = pc.getPType();
+    return Aggregate.count(pc, numPartitions).parallelDo("shards", new ShardFn<T>(pt),
pt);
+  }
+  
+  private static class ShardFn<T> extends DoFn<Pair<T, Long>, T> {
+    private final PType<T> ptype;
+    
+    public ShardFn(PType<T> ptype) {
+      this.ptype = ptype;
+    }
+    
+    @Override
+    public void initialize() {
+      ptype.initialize(getConfiguration());
+    }
+    
+    @Override
+    public void process(Pair<T, Long> input, Emitter<T> emitter) {
+      for (int i = 0; i < input.second(); i++) {
+        emitter.emit(ptype.getDetachedValue(input.first()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
index da8db6b..0a5c404 100644
--- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
@@ -27,6 +27,10 @@ public class PartitionUtils {
   public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task";
   public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L;
   
+  public static <T> int getRecommendedPartitions(PCollection<T> pcollection)
{
+    return getRecommendedPartitions(pcollection, pcollection.getPipeline().getConfiguration());
+  }
+  
   public static <T> int getRecommendedPartitions(PCollection<T> pcollection,
Configuration conf) {
     long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK);
     return 1 + (int) (pcollection.getSize() / bytesPerTask);


Mime
View raw message