Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A313C200B78 for ; Fri, 2 Sep 2016 19:34:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A1945160AAE; Fri, 2 Sep 2016 17:34:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C385C160A8C for ; Fri, 2 Sep 2016 19:34:56 +0200 (CEST) Received: (qmail 33905 invoked by uid 500); 2 Sep 2016 17:34:56 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 33895 invoked by uid 99); 2 Sep 2016 17:34:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2016 17:34:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 96157188590 for ; Fri, 2 Sep 2016 17:34:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id e7yqtygBkaua for ; Fri, 2 Sep 2016 17:34:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B778C5F56D for ; Fri, 2 Sep 2016 17:34:52 +0000 (UTC) Received: (qmail 33677 invoked by uid 99); 2 Sep 2016 17:34:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2016 17:34:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C4CC4E0579; Fri, 2 Sep 2016 17:34:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Fri, 02 Sep 2016 17:34:53 -0000 Message-Id: In-Reply-To: <230b2ba6ac1e429a8e390b5cd9371864@git.apache.org> References: <230b2ba6ac1e429a8e390b5cd9371864@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] incubator-beam git commit: Fixed Combine display data archived-at: Fri, 02 Sep 2016 17:34:57 -0000 Fixed Combine display data Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6adaebf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6adaebf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6adaebf7 Branch: refs/heads/master Commit: 6adaebf76a6f6caef66dc60a56b94e2734689723 Parents: 2c0bbfe Author: Ian Zhou Authored: Thu Aug 18 13:50:52 2016 -0700 Committer: Dan Halperin Committed: Fri Sep 2 10:34:37 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/Combine.java | 53 ++++++++++++++++++-- .../apache/beam/sdk/transforms/CombineTest.java | 19 +++++++ 2 files changed, 68 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6adaebf7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 26f0f66..d432e15 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -1815,7 +1816,14 @@ public class Combine { */ public PerKeyWithHotKeyFanout withHotKeyFanout(final int hotKeyFanout) { return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, - new SerializableFunction() { + new SimpleFunction() { + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout) + .withLabel("Key Fanout Size"), 0); + } + @Override public Integer apply(K unused) { return hotKeyFanout; @@ -1904,7 +1912,7 @@ public class Combine { new InputOrAccum.InputOrAccumCoder( inputCoder.getValueCoder(), accumCoder); - // A CombineFn's mergeAccumulator can be applied in a tree-like fashon. + // A CombineFn's mergeAccumulator can be applied in a tree-like fashion. // Here we shard the key using an integer nonce, combine on that partial // set of values, then drop the nonce and do a final combine of the // aggregates. We do this by splitting the original CombineFn into two, @@ -1944,6 +1952,16 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; postCombine = new KeyedCombineFn, AccumT, OutputT>() { @@ -1988,6 +2006,15 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; } else { final KeyedCombineFnWithContext keyedFnWithContext = @@ -2028,6 +2055,15 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; postCombine = new KeyedCombineFnWithContext, AccumT, OutputT>() { @@ -2073,6 +2109,15 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; } @@ -2117,7 +2162,7 @@ public class Combine { .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()), inputCoder.getValueCoder())) .setWindowingStrategyInternal(preCombineStrategy) - .apply("PreCombineHot", Combine.perKey(hotPreCombine)) + .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData)) .apply("StripNonce", MapElements.via( new SimpleFunction, AccumT>, KV>>() { @@ -2147,7 +2192,7 @@ public class Combine { // Combine the union of the pre-processed hot and cold key results. return PCollectionList.of(precombinedHot).and(preprocessedCold) .apply(Flatten.>>pCollections()) - .apply("PostCombine", Combine.perKey(postCombine)); + .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6adaebf7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 77a1d6b..be061af 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -731,6 +731,25 @@ public class CombineTest implements Serializable { displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); } + @Test + @Category(RunnableOnService.class) + public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() { + int hotKeyFanout = 2; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform>, PCollection>>> combine = + Combine.>perKey(combineFn).withHotKeyFanout(hotKeyFanout); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout))); + } + //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns.