Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 388E3200CB4 for ; Mon, 12 Jun 2017 18:55:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 376B0160BDE; Mon, 12 Jun 2017 16:55:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D8B5F160BF4 for ; Mon, 12 Jun 2017 18:55:21 +0200 (CEST) Received: (qmail 11941 invoked by uid 500); 12 Jun 2017 16:55:20 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 8959 invoked by uid 99); 12 Jun 2017 16:55:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 16:55:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE011E03B3; Mon, 12 Jun 2017 16:55:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 12 Jun 2017 16:56:00 -0000 Message-Id: <6e7ae1e3d03740639ceca79a0495713b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [45/50] [abbrv] beam git commit: Port DirectGroupByKey to SDK-agnostic APIs archived-at: Mon, 12 Jun 2017 16:55:23 -0000 Port DirectGroupByKey to SDK-agnostic APIs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02dbaefd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02dbaefd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02dbaefd Branch: refs/heads/gearpump-runner Commit: 02dbaefd2bbad0f0ff0b87469d184137b220fae7 Parents: 8c5b57e Author: Kenneth Knowles Authored: Fri May 26 14:27:23 2017 -0700 Committer: Kenneth Knowles Committed: Fri Jun 9 19:56:52 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/DirectGroupByKey.java | 13 +++++++------ .../direct/DirectGroupByKeyOverrideFactory.java | 14 +++++++++++--- 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 2fc0dd4..06b8e29 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -36,13 +36,17 @@ import org.apache.beam.sdk.values.WindowingStrategy; class DirectGroupByKey extends ForwardingPTransform>, PCollection>>> { - private final GroupByKey original; + private final PTransform>, PCollection>>> original; static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1"; static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1"; + private final WindowingStrategy outputWindowingStrategy; - DirectGroupByKey(GroupByKey from) { - this.original = from; + DirectGroupByKey( + PTransform>, PCollection>>> original, + WindowingStrategy outputWindowingStrategy) { + this.original = original; + this.outputWindowingStrategy = outputWindowingStrategy; } @Override @@ -57,9 +61,6 @@ class DirectGroupByKey // key/value input elements and the window merge operation of the // window function associated with the input PCollection. WindowingStrategy inputWindowingStrategy = input.getWindowingStrategy(); - // Update the windowing strategy as appropriate. - WindowingStrategy outputWindowingStrategy = - original.updateWindowingStrategy(inputWindowingStrategy); // By default, implement GroupByKey via a series of lower-level operations. return input http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index c2eb5e7..9c2de3d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,26 +17,34 @@ */ package org.apache.beam.runners.direct; +import com.google.common.collect.Iterables; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */ final class DirectGroupByKeyOverrideFactory extends SingleInputOutputOverrideFactory< - PCollection>, PCollection>>, GroupByKey> { + PCollection>, PCollection>>, + PTransform>, PCollection>>>> { @Override public PTransformReplacement>, PCollection>>> getReplacementTransform( AppliedPTransform< - PCollection>, PCollection>>, GroupByKey> + PCollection>, PCollection>>, + PTransform>, PCollection>>>> transform) { + + PCollection>> output = + (PCollection>>) Iterables.getOnlyElement(transform.getOutputs().values()); + return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new DirectGroupByKey<>(transform.getTransform())); + new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy())); } }