Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC7C1200BE2 for ; Thu, 15 Dec 2016 23:29:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AAF98160B15; Thu, 15 Dec 2016 22:29:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 35C20160B13 for ; Thu, 15 Dec 2016 23:29:17 +0100 (CET) Received: (qmail 9873 invoked by uid 500); 15 Dec 2016 22:29:16 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 9864 invoked by uid 99); 15 Dec 2016 22:29:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Dec 2016 22:29:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E6F0EC01DD for ; Thu, 15 Dec 2016 22:29:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id cQa6KxWhbID5 for ; Thu, 15 Dec 2016 22:29:09 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 6EBE35F19B for ; Thu, 15 Dec 2016 22:29:07 +0000 (UTC) Received: (qmail 8345 invoked by uid 99); 15 Dec 2016 22:29:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Dec 2016 22:29:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82CCFDFBFF; Thu, 15 Dec 2016 22:29:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Thu, 15 Dec 2016 22:29:08 -0000 Message-Id: In-Reply-To: <698eac860468431bb31ea4e365d26e61@git.apache.org> References: <698eac860468431bb31ea4e365d26e61@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/10] incubator-beam git commit: Moves DoFnAdapters to runners-core archived-at: Thu, 15 Dec 2016 22:29:18 -0000 Moves DoFnAdapters to runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33ed3238 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33ed3238 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33ed3238 Branch: refs/heads/master Commit: 33ed3238e2b3899cff061be3056c5cc29fc60a04 Parents: ca1dd7a Author: Eugene Kirpichov Authored: Fri Dec 9 17:28:16 2016 -0800 Committer: Eugene Kirpichov Committed: Thu Dec 15 13:59:11 2016 -0800 ---------------------------------------------------------------------- .../apex/translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../apache/beam/runners/core/DoFnAdapters.java | 344 +++++++++++++++++++ .../beam/runners/core/SimpleOldDoFnRunner.java | 4 +- .../core/GroupAlsoByWindowsProperties.java | 2 +- .../functions/FlinkDoFnFunction.java | 2 +- .../functions/FlinkMultiOutputDoFnFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../sdk/transforms/AggregatorRetriever.java | 13 +- .../beam/sdk/transforms/DoFnAdapters.java | 340 ------------------ .../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +- .../apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 14 files changed, 367 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index 33b9269..ef049e1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -22,8 +22,8 @@ import java.util.Collections; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 48ac177..4af7ff0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -413,7 +413,7 @@ public class ApexGroupByKeyOperator implements Operator { } @Override - protected Aggregator createAggregatorInternal( + public Aggregator createAggregatorInternal( String name, Combine.CombineFn combiner) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 08f062d..1e76949 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; @@ -48,7 +49,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.NullSideInputReader; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java new file mode 100644 index 0000000..0f5624f --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AggregatorRetriever; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Context; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. + * + * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link + * DoFnInvoker}) rather than via {@link OldDoFn}. + */ +@Deprecated +public class DoFnAdapters { + /** Should not be instantiated. */ + private DoFnAdapters() {} + + /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static OldDoFn toOldDoFn(DoFn fn) { + DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); + if (signature.processElement().observesWindow()) { + return new WindowDoFnAdapter<>(fn); + } else { + return new SimpleDoFnAdapter<>(fn); + } + } + + /** + * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link + * OldDoFn}. + */ + private static class SimpleDoFnAdapter extends OldDoFn { + private final DoFn fn; + private transient DoFnInvoker invoker; + + SimpleDoFnAdapter(DoFn fn) { + super(AggregatorRetriever.getDelegatingAggregators(fn)); + this.fn = fn; + this.invoker = DoFnInvokers.invokerFor(fn); + } + + @Override + public void setup() throws Exception { + this.invoker.invokeSetup(); + } + + @Override + public void startBundle(Context c) throws Exception { + fn.prepareForProcessing(); + invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); + } + + @Override + public void finishBundle(Context c) throws Exception { + invoker.invokeFinishBundle(new ContextAdapter<>(fn, c)); + } + + @Override + public void teardown() throws Exception { + this.invoker.invokeTeardown(); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + ProcessContextAdapter adapter = new ProcessContextAdapter<>(fn, c); + invoker.invokeProcessElement(adapter); + } + + @Override + public Duration getAllowedTimestampSkew() { + return fn.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(fn); + } + + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.invoker = DoFnInvokers.invokerFor(fn); + } + } + + /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ + private static class WindowDoFnAdapter extends SimpleDoFnAdapter + implements OldDoFn.RequiresWindowAccess { + + WindowDoFnAdapter(DoFn fn) { + super(fn); + } + } + + /** + * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link + * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is + * unavailable. + */ + private static class ContextAdapter extends DoFn.Context + implements DoFnInvoker.ArgumentProvider { + + private OldDoFn.Context context; + + private ContextAdapter(DoFn fn, OldDoFn.Context context) { + fn.super(); + this.context = context; + super.setupDelegateAggregators(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + context.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + context.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + context.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregator( + String name, + CombineFn combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override + public BoundedWindow window() { + // The OldDoFn doesn't allow us to ask for these outside processElement, so this + // should be unreachable. + throw new UnsupportedOperationException( + "Can only get the window in processElement; elsewhere there is no defined window."); + } + + @Override + public Context context(DoFn doFn) { + return this; + } + + @Override + public ProcessContext processContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Can only get a ProcessContext in processElement"); + } + + @Override + public OnTimerContext onTimerContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Timers are not supported for OldDoFn"); + } + + @Override + public DoFn.InputProvider inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); + } + + @Override + public RestrictionTracker restrictionTracker() { + throw new UnsupportedOperationException("This is a non-splittable DoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } + } + + /** + * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. + */ + private static class ProcessContextAdapter + extends DoFn.ProcessContext + implements DoFnInvoker.ArgumentProvider { + + private OldDoFn.ProcessContext context; + + private ProcessContextAdapter( + DoFn fn, OldDoFn.ProcessContext context) { + fn.super(); + this.context = context; + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public T sideInput(PCollectionView view) { + return context.sideInput(view); + } + + @Override + public void output(OutputT output) { + context.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + context.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + context.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregator( + String name, CombineFn combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override + public InputT element() { + return context.element(); + } + + @Override + public Instant timestamp() { + return context.timestamp(); + } + + @Override + public PaneInfo pane() { + return context.pane(); + } + + @Override + public BoundedWindow window() { + return context.window(); + } + + @Override + public Context context(DoFn doFn) { + return this; + } + + @Override + public ProcessContext processContext(DoFn doFn) { + return this; + } + + @Override + public OnTimerContext onTimerContext(DoFn doFn) { + throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); + } + + @Override + public DoFn.InputProvider inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); + } + + @Override + public RestrictionTracker restrictionTracker() { + throw new UnsupportedOperationException("This is a non-splittable DoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 73286ad..10af29a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -323,7 +323,7 @@ class SimpleOldDoFnRunner implements DoFnRunner Aggregator createAggregatorInternal( + public Aggregator createAggregatorInternal( String name, CombineFn combiner) { checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null"); return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); @@ -505,7 +505,7 @@ class SimpleOldDoFnRunner implements DoFnRunner Aggregator + public Aggregator createAggregatorInternal( String name, CombineFn combiner) { return context.createAggregatorInternal(name, combiner); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 97b67c6..ef01106 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -744,7 +744,7 @@ public class GroupAlsoByWindowsProperties { } @Override - protected Aggregator createAggregatorInternal( + public Aggregator createAggregatorInternal( String name, CombineFn combiner) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index ed200d5..2a4a68e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index 7f6a436..a97bd46 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 6afca38..53b9803 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -252,7 +252,7 @@ abstract class FlinkProcessContextBase public abstract void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp); @Override - protected Aggregator + public Aggregator createAggregatorInternal(String name, Combine.CombineFn combiner) { @SuppressWarnings("unchecked") SerializableFnAggregatorWrapper result = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 6729aaa..87b15a7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; @@ -40,7 +41,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index ce47e22..b1d3ead 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -18,9 +18,10 @@ package org.apache.beam.sdk.transforms; import java.util.Collection; +import java.util.Map; /** - * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}. + * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}. */ public final class AggregatorRetriever { private AggregatorRetriever() { @@ -28,9 +29,17 @@ public final class AggregatorRetriever { } /** - * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. + * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}. */ public static Collection> getAggregators(DoFn fn) { return fn.getAggregators(); } + + /** + * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link + * DoFn}. + */ + public static Map> getDelegatingAggregators(DoFn fn) { + return fn.aggregators; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java deleted file mode 100644 index 0a71faa..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import java.io.IOException; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn.Context; -import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. - * - * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link - * DoFnInvoker}) rather than via {@link OldDoFn}. - */ -@Deprecated -public class DoFnAdapters { - /** Should not be instantiated. */ - private DoFnAdapters() {} - - /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public static OldDoFn toOldDoFn(DoFn fn) { - DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); - if (signature.processElement().observesWindow()) { - return new WindowDoFnAdapter<>(fn); - } else { - return new SimpleDoFnAdapter<>(fn); - } - } - - /** - * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link - * OldDoFn}. - */ - private static class SimpleDoFnAdapter extends OldDoFn { - private final DoFn fn; - private transient DoFnInvoker invoker; - - SimpleDoFnAdapter(DoFn fn) { - super(fn.aggregators); - this.fn = fn; - this.invoker = DoFnInvokers.invokerFor(fn); - } - - @Override - public void setup() throws Exception { - this.invoker.invokeSetup(); - } - - @Override - public void startBundle(Context c) throws Exception { - fn.prepareForProcessing(); - invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); - } - - @Override - public void finishBundle(Context c) throws Exception { - invoker.invokeFinishBundle(new ContextAdapter<>(fn, c)); - } - - @Override - public void teardown() throws Exception { - this.invoker.invokeTeardown(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - ProcessContextAdapter adapter = new ProcessContextAdapter<>(fn, c); - invoker.invokeProcessElement(adapter); - } - - @Override - public Duration getAllowedTimestampSkew() { - return fn.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(fn); - } - - private void readObject(java.io.ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.invoker = DoFnInvokers.invokerFor(fn); - } - } - - /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ - private static class WindowDoFnAdapter extends SimpleDoFnAdapter - implements OldDoFn.RequiresWindowAccess { - - WindowDoFnAdapter(DoFn fn) { - super(fn); - } - } - - /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link - * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is - * unavailable. - */ - private static class ContextAdapter extends DoFn.Context - implements DoFnInvoker.ArgumentProvider { - - private OldDoFn.Context context; - - private ContextAdapter(DoFn fn, OldDoFn.Context context) { - fn.super(); - this.context = context; - super.setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - context.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregator( - String name, - CombineFn combiner) { - return context.createAggregatorInternal(name, combiner); - } - - @Override - public BoundedWindow window() { - // The OldDoFn doesn't allow us to ask for these outside processElement, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the window in processElement; elsewhere there is no defined window."); - } - - @Override - public Context context(DoFn doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn doFn) { - throw new UnsupportedOperationException( - "Can only get a ProcessContext in processElement"); - } - - @Override - public OnTimerContext onTimerContext(DoFn doFn) { - throw new UnsupportedOperationException( - "Timers are not supported for OldDoFn"); - } - - @Override - public DoFn.InputProvider inputProvider() { - throw new UnsupportedOperationException("inputProvider() exists only for testing"); - } - - @Override - public DoFn.OutputReceiver outputReceiver() { - throw new UnsupportedOperationException("outputReceiver() exists only for testing"); - } - - @Override - public RestrictionTracker restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } - - /** - * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. - */ - private static class ProcessContextAdapter - extends DoFn.ProcessContext - implements DoFnInvoker.ArgumentProvider { - - private OldDoFn.ProcessContext context; - - private ProcessContextAdapter( - DoFn fn, OldDoFn.ProcessContext context) { - fn.super(); - this.context = context; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public T sideInput(PCollectionView view) { - return context.sideInput(view); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - context.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregator( - String name, CombineFn combiner) { - return context.createAggregatorInternal(name, combiner); - } - - @Override - public InputT element() { - return context.element(); - } - - @Override - public Instant timestamp() { - return context.timestamp(); - } - - @Override - public PaneInfo pane() { - return context.pane(); - } - - @Override - public BoundedWindow window() { - return context.window(); - } - - @Override - public Context context(DoFn doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn doFn) { - return this; - } - - @Override - public OnTimerContext onTimerContext(DoFn doFn) { - throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); - } - - @Override - public DoFn.InputProvider inputProvider() { - throw new UnsupportedOperationException("inputProvider() exists only for testing"); - } - - @Override - public DoFn.OutputReceiver outputReceiver() { - throw new UnsupportedOperationException("outputReceiver() exists only for testing"); - } - - @Override - public RestrictionTracker restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 0aef552..7b04533 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -192,7 +192,7 @@ public abstract class OldDoFn implements Serializable, HasDispl * context */ @Experimental(Kind.AGGREGATOR) - protected abstract Aggregator + public abstract Aggregator createAggregatorInternal(String name, CombineFn combiner); /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index 504480b..0db130d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -63,7 +63,7 @@ class NoOpOldDoFn extends OldDoFn { Instant timestamp) { } @Override - protected Aggregator + public Aggregator createAggregatorInternal(String name, CombineFn combiner) { return null; }