Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C60B21085B for ; Fri, 7 Jun 2013 05:47:11 +0000 (UTC) Received: (qmail 69703 invoked by uid 500); 7 Jun 2013 05:47:11 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 69666 invoked by uid 500); 7 Jun 2013 05:47:08 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 69659 invoked by uid 99); 7 Jun 2013 05:47:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jun 2013 05:47:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CF5CD813FB3; Fri, 7 Jun 2013 05:47:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <5c327bd130484522aa45dac1af18912a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-162: Add a Shard library for rebalancing the contents of PCollections Date: Fri, 7 Jun 2013 05:47:05 +0000 (UTC) Updated Branches: refs/heads/master ceaa6a5e0 -> 1d844b3f1 CRUNCH-162: Add a Shard library for rebalancing the contents of PCollections Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1d844b3f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1d844b3f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1d844b3f Branch: refs/heads/master Commit: 1d844b3f1f107cc645319080ff3e9e73f0dd72e2 Parents: ceaa6a5 Author: Josh Wills Authored: Thu Jun 6 05:49:30 2013 -0700 Committer: Josh Wills Committed: Thu Jun 6 05:49:30 2013 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Aggregate.java | 22 ++++-- .../src/main/java/org/apache/crunch/lib/Shard.java | 65 +++++++++++++++ .../org/apache/crunch/util/PartitionUtils.java | 4 + 3 files changed, 84 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index d4109cc..d8388b3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -33,11 +33,11 @@ import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.fn.Aggregators; -import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.materialize.pobject.FirstElementPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.util.PartitionUtils; import com.google.common.collect.Lists; @@ -52,15 +52,24 @@ public class Aggregate { * of their occurrences. */ public static PTable count(PCollection collect) { + return count(collect, PartitionUtils.getRecommendedPartitions(collect)); + } + + /** + * Returns a {@code PTable} that contains the unique elements of this collection mapped to a count + * of their occurrences. + */ + public static PTable count(PCollection collect, int numPartitions) { PTypeFamily tf = collect.getTypeFamily(); return collect.parallelDo("Aggregate.count", new MapFn>() { public Pair map(S input) { return Pair.of(input, 1L); } - }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey() + }, tf.tableOf(collect.getPType(), tf.longs())) + .groupByKey(numPartitions) .combineValues(Aggregators.SUM_LONGS()); } - + /** * Returns the number of elements in the provided PCollection. * @@ -252,9 +261,8 @@ public class Aggregate { public static PTable> collectValues(PTable collect) { PTypeFamily tf = collect.getTypeFamily(); final PType valueType = collect.getValueType(); - return collect.groupByKey().parallelDo("collect", - new MapValuesFn, Collection>() { - + return collect.groupByKey().mapValues("collect", + new MapFn, Collection>() { @Override public void initialize() { valueType.initialize(getConfiguration()); @@ -267,6 +275,6 @@ public class Aggregate { } return collected; } - }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType()))); + }, tf.collections(collect.getValueType())); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java new file mode 100644 index 0000000..07ba0db --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PType; + +/** + * Utilities for controlling how the data in a {@code PCollection} is balanced across reducers + * and output files. + */ +public class Shard { + + /** + * Creates a {@code PCollection} that has the same contents as its input argument but will + * be written to a fixed number of output files. This is useful for map-only jobs that process + * lots of input files but only write out a small amount of input per task. + * + * @param pc The {@code PCollection} to rebalance + * @param numPartitions The number of output partitions to create + * @return A rebalanced {@code PCollection} with the same contents as the input + */ + public static PCollection shard(PCollection pc, int numPartitions) { + PType pt = pc.getPType(); + return Aggregate.count(pc, numPartitions).parallelDo("shards", new ShardFn(pt), pt); + } + + private static class ShardFn extends DoFn, T> { + private final PType ptype; + + public ShardFn(PType ptype) { + this.ptype = ptype; + } + + @Override + public void initialize() { + ptype.initialize(getConfiguration()); + } + + @Override + public void process(Pair input, Emitter emitter) { + for (int i = 0; i < input.second(); i++) { + emitter.emit(ptype.getDetachedValue(input.first())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index da8db6b..0a5c404 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -27,6 +27,10 @@ public class PartitionUtils { public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task"; public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L; + public static int getRecommendedPartitions(PCollection pcollection) { + return getRecommendedPartitions(pcollection, pcollection.getPipeline().getConfiguration()); + } + public static int getRecommendedPartitions(PCollection pcollection, Configuration conf) { long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); return 1 + (int) (pcollection.getSize() / bytesPerTask);