Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90A8010199 for ; Fri, 5 Apr 2013 19:34:50 +0000 (UTC) Received: (qmail 7837 invoked by uid 500); 5 Apr 2013 19:34:50 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 7806 invoked by uid 500); 5 Apr 2013 19:34:50 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 7797 invoked by uid 99); 5 Apr 2013 19:34:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Apr 2013 19:34:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DCBAC1EB92; Fri, 5 Apr 2013 19:34:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greid@apache.org To: commits@crunch.apache.org Message-Id: <8efe04ad5a0e46d79b31526084ca2a89@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-191 Detached retained values in Distinct Date: Fri, 5 Apr 2013 19:34:49 +0000 (UTC) Updated Branches: refs/heads/master 64497fa4f -> 3e513cfab CRUNCH-191 Detached retained values in Distinct Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3e513cfa Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3e513cfa Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3e513cfa Branch: refs/heads/master Commit: 3e513cfabf7d37321c868ea8007aa3c9d202e644 Parents: 64497fa Author: Gabriel Reid Authored: Fri Apr 5 21:32:31 2013 +0200 Committer: Gabriel Reid Committed: Fri Apr 5 21:32:31 2013 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Distinct.java | 14 +++++++++++--- 1 files changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3e513cfa/crunch/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java index 533f3fb..994830d 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java @@ -69,7 +69,7 @@ public final class Distinct { PType pt = input.getPType(); PTypeFamily ptf = pt.getFamily(); return input - .parallelDo("pre-distinct", new PreDistinctFn(flushEvery), ptf.tableOf(pt, ptf.nulls())) + .parallelDo("pre-distinct", new PreDistinctFn(flushEvery, pt), ptf.tableOf(pt, ptf.nulls())) .groupByKey() .parallelDo("post-distinct", new PostDistinctFn(), pt); } @@ -84,14 +84,22 @@ public final class Distinct { private static class PreDistinctFn extends DoFn> { private final Set values = Sets.newHashSet(); private final int flushEvery; + private final PType ptype; - public PreDistinctFn(int flushEvery) { + public PreDistinctFn(int flushEvery, PType ptype) { this.flushEvery = flushEvery; + this.ptype = ptype; + } + + @Override + public void initialize() { + super.initialize(); + ptype.initialize(getConfiguration()); } @Override public void process(S input, Emitter> emitter) { - values.add(input); + values.add(ptype.getDetachedValue(input)); if (values.size() > flushEvery) { cleanup(emitter); }