Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86F77200CAE for ; Tue, 6 Jun 2017 15:28:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 85996160BD3; Tue, 6 Jun 2017 13:28:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7E53B160BC3 for ; Tue, 6 Jun 2017 15:28:46 +0200 (CEST) Received: (qmail 37677 invoked by uid 500); 6 Jun 2017 13:28:45 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 37668 invoked by uid 99); 6 Jun 2017 13:28:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 13:28:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8E4F1DFC2E; Tue, 6 Jun 2017 13:28:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@beam.apache.org Date: Tue, 06 Jun 2017 13:28:45 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: Remove the FnOutputT parameter from DoFnOperator archived-at: Tue, 06 Jun 2017 13:28:47 -0000 Repository: beam Updated Branches: refs/heads/master 88f78fa2f -> aebd3a4c5 Remove the FnOutputT parameter from DoFnOperator Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8f26085 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8f26085 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8f26085 Branch: refs/heads/master Commit: e8f26085e889f8f618c0961a5458cbc42b432c01 Parents: b0601fd Author: JingsongLi Authored: Tue Jun 6 17:31:09 2017 +0800 Committer: Aljoscha Krettek Committed: Tue Jun 6 14:33:36 2017 +0200 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 10 +++++----- .../wrappers/streaming/DoFnOperator.java | 20 ++++++++++---------- .../streaming/SplittableDoFnOperator.java | 12 ++++++------ .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../beam/runners/flink/PipelineOptionsTest.java | 6 +++--- .../flink/streaming/DoFnOperatorTest.java | 11 +++++------ 6 files changed, 30 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d8c3049..2a7c5d6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -332,7 +332,7 @@ class FlinkStreamingTransformTranslators { static class ParDoTranslationHelper { interface DoFnOperatorFactory { - DoFnOperator createDoFnOperator( + DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -395,7 +395,7 @@ class FlinkStreamingTransformTranslators { context.getCoder((PCollection) outputs.get(mainOutputTag))); if (sideInputs.isEmpty()) { - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -416,7 +416,7 @@ class FlinkStreamingTransformTranslators { Tuple2>, DataStream> transformedSideInputs = transformSideInputs(sideInputs, context); - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -493,7 +493,7 @@ class FlinkStreamingTransformTranslators { context, new ParDoTranslationHelper.DoFnOperatorFactory() { @Override - public DoFnOperator createDoFnOperator( + public DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -547,7 +547,7 @@ class FlinkStreamingTransformTranslators { @Override public DoFnOperator< KeyedWorkItem>, - OutputT, OutputT> createDoFnOperator( + OutputT> createDoFnOperator( DoFn< KeyedWorkItem>, OutputT> doFn, http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8c27ed9..350f323 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -94,21 +94,21 @@ import org.joda.time.Instant; * Flink operator for executing {@link DoFn DoFns}. * * @param the input type of the {@link DoFn} - * @param the output type of the {@link DoFn} + * @param the output type of the {@link DoFn} * @param the output type of the operator, this can be different from the fn output * type when we have additional tagged outputs */ -public class DoFnOperator +public class DoFnOperator extends AbstractStreamOperator> implements OneInputStreamOperator, WindowedValue>, TwoInputStreamOperator, RawUnionValue, WindowedValue>, KeyGroupCheckpointedOperator, Triggerable { - protected DoFn doFn; + protected DoFn doFn; protected final SerializedPipelineOptions serializedOptions; - protected final TupleTag mainOutputTag; + protected final TupleTag mainOutputTag; protected final List> additionalOutputTags; protected final Collection> sideInputs; @@ -118,8 +118,8 @@ public class DoFnOperator protected final OutputManagerFactory outputManagerFactory; - protected transient DoFnRunner doFnRunner; - protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; + protected transient DoFnRunner doFnRunner; + protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; protected transient SideInputHandler sideInputHandler; @@ -127,7 +127,7 @@ public class DoFnOperator protected transient DoFnRunners.OutputManager outputManager; - private transient DoFnInvoker doFnInvoker; + private transient DoFnInvoker doFnInvoker; protected transient long currentInputWatermark; @@ -156,10 +156,10 @@ public class DoFnOperator private transient Optional pushedBackWatermark; public DoFnOperator( - DoFn doFn, + DoFn doFn, String stepName, Coder> inputCoder, - TupleTag mainOutputTag, + TupleTag mainOutputTag, List> additionalOutputTags, OutputManagerFactory outputManagerFactory, WindowingStrategy windowingStrategy, @@ -192,7 +192,7 @@ public class DoFnOperator // allow overriding this in WindowDoFnOperator because this one dynamically creates // the DoFn - protected DoFn getDoFn() { + protected DoFn getDoFn() { return doFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 4ac2ff5..5d08eba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -55,19 +55,19 @@ import org.joda.time.Instant; * the {@code @ProcessElement} method of a splittable {@link DoFn}. */ public class SplittableDoFnOperator< - InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends DoFnOperator< - KeyedWorkItem>, FnOutputT, OutputT> { + KeyedWorkItem>, OutputT> { private transient ScheduledExecutorService executorService; public SplittableDoFnOperator( - DoFn>, FnOutputT> doFn, + DoFn>, OutputT> doFn, String stepName, Coder< WindowedValue< KeyedWorkItem>>> inputCoder, - TupleTag mainOutputTag, + TupleTag mainOutputTag, List> additionalOutputTags, OutputManagerFactory outputManagerFactory, WindowingStrategy windowingStrategy, @@ -120,10 +120,10 @@ public class SplittableDoFnOperator< new OutputAndTimeBoundedSplittableProcessElementInvoker<>( doFn, serializedOptions.getPipelineOptions(), - new OutputWindowedValue() { + new OutputWindowedValue() { @Override public void outputWindowedValue( - FnOutputT output, + OutputT output, Instant timestamp, Collection windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index ea578b9..78d585e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer; * Flink operator for executing window {@link DoFn DoFns}. */ public class WindowDoFnOperator - extends DoFnOperator, KV, KV> { + extends DoFnOperator, KV> { private final SystemReduceFn systemReduceFn; http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index bc0b1c2..d0281ec 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -136,7 +136,7 @@ public class PipelineOptionsTest { @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -157,7 +157,7 @@ public class PipelineOptionsTest { @Test public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -173,7 +173,7 @@ public class PipelineOptionsTest { final byte[] serialized = SerializationUtils.serialize(doFnOperator); @SuppressWarnings("unchecked") - DoFnOperator deserialized = SerializationUtils.deserialize(serialized); + DoFnOperator deserialized = SerializationUtils.deserialize(serialized); TypeInformation> typeInformation = TypeInformation.of( new TypeHint>() {}); http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 132242e..ad9d236 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -111,7 +111,7 @@ public class DoFnOperatorTest { TupleTag outputTag = new TupleTag<>("main-output"); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new IdentityDoFn(), "stepName", windowedValueCoder, @@ -155,7 +155,7 @@ public class DoFnOperatorTest { .put(additionalOutput2, new OutputTag(additionalOutput2.getId()){}) .build(); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), "stepName", windowedValueCoder, @@ -223,7 +223,7 @@ public class DoFnOperatorTest { TupleTag outputTag = new TupleTag<>("main-output"); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, @@ -335,8 +335,7 @@ public class DoFnOperatorTest { TupleTag> outputTag = new TupleTag<>("main-output"); - DoFnOperator< - KV, KV, KV> doFnOperator = + DoFnOperator, KV> doFnOperator = new DoFnOperator<>( fn, "stepName", @@ -433,7 +432,7 @@ public class DoFnOperatorTest { keyCoder = StringUtf8Coder.of(); } - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new IdentityDoFn(), "stepName", windowedValueCoder,