Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1601C10F37 for ; Sat, 29 Mar 2014 17:02:55 +0000 (UTC) Received: (qmail 76263 invoked by uid 500); 29 Mar 2014 17:02:54 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 76217 invoked by uid 500); 29 Mar 2014 17:02:52 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 76198 invoked by uid 99); 29 Mar 2014 17:02:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 29 Mar 2014 17:02:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0E734838843; Sat, 29 Mar 2014 17:02:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chaoshi@apache.org To: commits@crunch.apache.org Date: Sat, 29 Mar 2014 17:02:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: CRUNCH-351: Impove performance of Shard#shard(). CRUNCH-351: Impove performance of Shard#shard(). Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4a9bacad Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4a9bacad Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4a9bacad Branch: refs/heads/apache-crunch-0.8 Commit: 4a9bacadc0395de809524765a0787fa95087f014 Parents: a62cfb9 Author: Chao Shi Authored: Sat Feb 22 18:22:08 2014 +0800 Committer: Chao Shi Committed: Sun Mar 30 00:33:39 2014 +0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/lib/ShardIT.java | 53 ++++++++++++++++++++ .../main/java/org/apache/crunch/lib/Shard.java | 32 +++++------- 2 files changed, 66 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/4a9bacad/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java new file mode 100644 index 0000000..248c260 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultiset; +import org.apache.commons.io.FileUtils; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.TemporaryPath; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; + +public class ShardIT { + + @Rule + public TemporaryPath tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir"); + + @Test + public void testShard() throws Exception { + File inDir = tempDir.getFile("in"); + FileUtils.writeLines(new File(inDir, "part1"), ImmutableList.of("part1", "part1")); + FileUtils.writeLines(new File(inDir, "part2"), ImmutableList.of("part2")); + Pipeline pipeline = new MRPipeline(ShardIT.class); + PCollection in = pipeline.read(From.textFile(inDir.getPath())); + // We can only test on 1 shard here, as local MR does not support multiple reducers. + PCollection out = Shard.shard(in, 1); + assertEquals( + ImmutableMultiset.copyOf(out.materialize()), + ImmutableMultiset.of("part1", "part1", "part2")); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/4a9bacad/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java index 07ba0db..aab791b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java @@ -17,11 +17,8 @@ */ package org.apache.crunch.lib; -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; -import org.apache.crunch.Pair; -import org.apache.crunch.types.PType; /** * Utilities for controlling how the data in a {@code PCollection} is balanced across reducers @@ -39,27 +36,24 @@ public class Shard { * @return A rebalanced {@code PCollection} with the same contents as the input */ public static PCollection shard(PCollection pc, int numPartitions) { - PType pt = pc.getPType(); - return Aggregate.count(pc, numPartitions).parallelDo("shards", new ShardFn(pt), pt); + return pc.by(new ShardFn(), pc.getTypeFamily().ints()) + .groupByKey(numPartitions) + .ungroup() + .values(); } - private static class ShardFn extends DoFn, T> { - private final PType ptype; - - public ShardFn(PType ptype) { - this.ptype = ptype; - } - + private static class ShardFn extends MapFn { + + private int count; + @Override public void initialize() { - ptype.initialize(getConfiguration()); + count = 0; } - + @Override - public void process(Pair input, Emitter emitter) { - for (int i = 0; i < input.second(); i++) { - emitter.emit(ptype.getDetachedValue(input.first())); - } + public Integer map(T input) { + return count++; } } }