Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13BA1200BB1 for ; Thu, 3 Nov 2016 17:39:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 121B0160AFF; Thu, 3 Nov 2016 16:39:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2F273160AE5 for ; Thu, 3 Nov 2016 17:39:40 +0100 (CET) Received: (qmail 41022 invoked by uid 500); 3 Nov 2016 16:39:39 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 41013 invoked by uid 99); 3 Nov 2016 16:39:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Nov 2016 16:39:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D717DC0DE8 for ; Thu, 3 Nov 2016 16:39:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id f_nqYy-Bhq1K for ; Thu, 3 Nov 2016 16:39:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 2DBA75FBDF for ; Thu, 3 Nov 2016 16:39:34 +0000 (UTC) Received: (qmail 40835 invoked by uid 99); 3 Nov 2016 16:39:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Nov 2016 16:39:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F0534E09DE; Thu, 3 Nov 2016 16:39:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Thu, 03 Nov 2016 16:39:32 -0000 Message-Id: <2d8a0d48839d4e9f89bda46527e1466a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: [BEAM-79] Port Gearpump runner from OldDoFn to new DoFn archived-at: Thu, 03 Nov 2016 16:39:42 -0000 Repository: incubator-beam Updated Branches: refs/heads/gearpump-runner 3933b5577 -> 323ec1188 [BEAM-79] Port Gearpump runner from OldDoFn to new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45570b9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45570b9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45570b9c Branch: refs/heads/gearpump-runner Commit: 45570b9c7ebb11080deca3346fc601c69796612a Parents: 3933b55 Author: manuzhang Authored: Mon Oct 31 11:52:22 2016 +0800 Committer: Kenneth Knowles Committed: Thu Nov 3 09:38:41 2016 -0700 ---------------------------------------------------------------------- .../gearpump/GearpumpPipelineTranslator.java | 2 +- .../translators/ParDoBoundMultiTranslator.java | 17 +- .../translators/ParDoBoundTranslator.java | 3 +- .../translators/functions/DoFnFunction.java | 19 +- .../translators/utils/DoFnRunnerFactory.java | 77 +++ .../translators/utils/GearpumpDoFnRunner.java | 516 ------------------- .../utils/NoOpAggregatorFactory.java | 41 ++ 7 files changed, 143 insertions(+), 532 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 5045ae4..8588fff 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -108,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { @Override public void visitValue(PValue value, TransformTreeNode producer) { - LOG.info("visiting value {}", value); + LOG.debug("visiting value {}", value); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 2b49684..54f1c3f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -27,11 +27,11 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; +import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator implements JavaStream, OutputT>>> outputStream = inputStream.flatMap( new DoFnMultiFunction<>( context.getPipelineOptions(), - transform.getFn(), + transform.getNewFn(), transform.getMainOutputTag(), transform.getSideOutputTags(), inputT.getWindowingStrategy(), @@ -87,18 +87,19 @@ public class ParDoBoundMultiTranslator implements FlatMapFunction, WindowedValue, OutputT>>>, DoFnRunners.OutputManager { - private final DoFnRunner doFnRunner; + private final DoFnRunnerFactory doFnRunnerFactory; + private DoFnRunner doFnRunner; private final List, OutputT>>> outputs = Lists .newArrayList(); public DoFnMultiFunction( GearpumpPipelineOptions pipelineOptions, - OldDoFn doFn, + DoFn doFn, TupleTag mainOutputTag, TupleTagList sideOutputTags, WindowingStrategy windowingStrategy, SideInputReader sideInputReader) { - this.doFnRunner = new GearpumpDoFnRunner<>( + this.doFnRunnerFactory = new DoFnRunnerFactory<>( pipelineOptions, doFn, sideInputReader, @@ -106,12 +107,16 @@ public class ParDoBoundMultiTranslator implements mainOutputTag, sideOutputTags.getAll(), new NoOpStepContext(), + new NoOpAggregatorFactory(), windowingStrategy ); } @Override public Iterator, OutputT>>> apply(WindowedValue wv) { + if (null == doFnRunner) { + doFnRunner = doFnRunnerFactory.createRunner(); + } doFnRunner.startBundle(); doFnRunner.processElement(wv); doFnRunner.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java index b97cbb4..a796c83 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.gearpump.translators; import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -39,7 +38,7 @@ public class ParDoBoundTranslator implements @Override public void translate(ParDo.Bound transform, TranslationContext context) { - OldDoFn doFn = transform.getFn(); + DoFn doFn = transform.getNewFn(); PCollection output = context.getOutput(transform); WindowingStrategy windowingStrategy = output.getWindowingStrategy(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index 8d16356..42969fe 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -26,10 +26,10 @@ import java.util.List; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; +import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -44,17 +44,17 @@ import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; public class DoFnFunction implements FlatMapFunction, WindowedValue>, DoFnRunners.OutputManager { - private final TupleTag mainTag = new TupleTag() { - }; - private final DoFnRunner doFnRunner; + private final TupleTag mainTag = new TupleTag() {}; private List> outputs = Lists.newArrayList(); + private final DoFnRunnerFactory doFnRunnerFactory; + private DoFnRunner doFnRunner; public DoFnFunction( GearpumpPipelineOptions pipelineOptions, - OldDoFn doFn, + DoFn doFn, WindowingStrategy windowingStrategy, SideInputReader sideInputReader) { - this.doFnRunner = new GearpumpDoFnRunner<>( + this.doFnRunnerFactory = new DoFnRunnerFactory<>( pipelineOptions, doFn, sideInputReader, @@ -62,6 +62,7 @@ public class DoFnFunction implements mainTag, TupleTagList.empty().getAll(), new NoOpStepContext(), + new NoOpAggregatorFactory(), windowingStrategy ); } @@ -70,6 +71,10 @@ public class DoFnFunction implements public Iterator> apply(WindowedValue value) { outputs = Lists.newArrayList(); + if (null == doFnRunner) { + doFnRunner = doFnRunnerFactory.createRunner(); + } + doFnRunner.startBundle(); doFnRunner.processElement(value); doFnRunner.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java new file mode 100644 index 0000000..7119a87 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import java.io.Serializable; +import java.util.List; + +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.SimpleDoFnRunner; +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; + +/** + * a serializable {@link SimpleDoFnRunner}. + */ +public class DoFnRunnerFactory implements Serializable { + + private final DoFn fn; + private final transient PipelineOptions options; + private final SideInputReader sideInputReader; + private final DoFnRunners.OutputManager outputManager; + private final TupleTag mainOutputTag; + private final List> sideOutputTags; + private final ExecutionContext.StepContext stepContext; + private final AggregatorFactory aggregatorFactory; + private final WindowingStrategy windowingStrategy; + + public DoFnRunnerFactory( + GearpumpPipelineOptions pipelineOptions, + DoFn doFn, + SideInputReader sideInputReader, + DoFnRunners.OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + ExecutionContext.StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + this.fn = doFn; + this.options = pipelineOptions; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.sideOutputTags = sideOutputTags; + this.stepContext = stepContext; + this.aggregatorFactory = aggregatorFactory; + this.windowingStrategy = windowingStrategy; + } + + public DoFnRunner createRunner() { + return DoFnRunners.createDefault(options, fn, sideInputReader, outputManager, mainOutputTag, + sideOutputTags, stepContext, aggregatorFactory, windowingStrategy); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java deleted file mode 100644 index ec86a8d..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java +++ /dev/null @@ -1,516 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators.utils; - -import static org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.SimpleDoFnRunner; -import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; - -import org.joda.time.Instant; - - -/** - * a serializable {@link SimpleDoFnRunner}. - */ -public class GearpumpDoFnRunner implements DoFnRunner, - Serializable { - - private final OldDoFn fn; - private final transient PipelineOptions options; - private final SideInputReader sideInputReader; - private final DoFnRunners.OutputManager outputManager; - private final TupleTag mainOutputTag; - private final List> sideOutputTags; - private final ExecutionContext.StepContext stepContext; - private final WindowFn windowFn; - private DoFnContext context; - - public GearpumpDoFnRunner( - GearpumpPipelineOptions pipelineOptions, - OldDoFn doFn, - SideInputReader sideInputReader, - DoFnRunners.OutputManager outputManager, - TupleTag mainOutputTag, - List> sideOutputTags, - ExecutionContext.StepContext stepContext, - WindowingStrategy windowingStrategy) { - this.fn = doFn; - this.options = pipelineOptions; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; - this.stepContext = stepContext; - this.windowFn = windowingStrategy == null ? null : windowingStrategy.getWindowFn(); - } - - @Override - public void startBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - if (null == context) { - this.context = new DoFnContext<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - windowFn - ); - } - fn.startBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - @Override - public void processElement(WindowedValue elem) { - if (elem.getWindows().size() <= 1 - || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) - && context.sideInputReader.isEmpty())) { - invokeProcessElement(elem); - } else { - // We could modify the windowed value (and the processContext) to - // avoid repeated allocations, but this is more straightforward. - for (BoundedWindow window : elem.getWindows()) { - invokeProcessElement(WindowedValue.of( - elem.getValue(), elem.getTimestamp(), window, elem.getPane())); - } - } - } - - @Override - public void finishBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.finishBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - private void invokeProcessElement(WindowedValue elem) { - final OldDoFn.ProcessContext processContext = - new DoFnProcessContext<>(fn, context, elem); - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.processElement(processContext); - } catch (Exception ex) { - throw wrapUserCodeException(ex); - } - } - - private RuntimeException wrapUserCodeException(Throwable t) { - throw UserCodeException.wrapIf(!isSystemDoFn(), t); - } - - private boolean isSystemDoFn() { - return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); - } - - /** - * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. - * - * @param the type of the DoFn's (main) input elements - * @param the type of the DoFn's (main) output elements - */ - private static class DoFnContext - extends OldDoFn.Context { - private static final int MAX_SIDE_OUTPUTS = 1000; - - final transient PipelineOptions options; - final OldDoFn fn; - final SideInputReader sideInputReader; - final DoFnRunners.OutputManager outputManager; - final TupleTag mainOutputTag; - final ExecutionContext.StepContext stepContext; - final WindowFn windowFn; - - /** - * The set of known output tags, some of which may be undeclared, so we can throw an - * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}. - */ - private final Set> outputTags; - - public DoFnContext(PipelineOptions options, - OldDoFn fn, - SideInputReader sideInputReader, - DoFnRunners.OutputManager outputManager, - TupleTag mainOutputTag, - List> sideOutputTags, - ExecutionContext.StepContext stepContext, - WindowFn windowFn) { - fn.super(); - this.options = options; - this.fn = fn; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.outputTags = Sets.newHashSet(); - - outputTags.add(mainOutputTag); - for (TupleTag sideOutputTag : sideOutputTags) { - outputTags.add(sideOutputTag); - } - - this.stepContext = stepContext; - this.windowFn = windowFn; - super.setupDelegateAggregators(); - } - - ////////////////////////////////////////////////////////////////////////////// - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - WindowedValue makeWindowedValue( - T output, Instant timestamp, Collection windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - // The windowFn can never succeed at accessing the element, so its type does not - // matter here - @SuppressWarnings("unchecked") - WindowFn objectWindowFn = (WindowFn) windowFn; - windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); - return sideInputReader.get(view, sideInputWindow); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); - } - - void outputWindowedValue(WindowedValue windowedElem) { - outputManager.output(mainOutputTag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(windowedElem); - } - } - - protected void sideOutputWindowedValue(TupleTag tag, - T output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); - } - - protected void sideOutputWindowedValue(TupleTag tag, WindowedValue windowedElem) { - if (!outputTags.contains(tag)) { - // This tag wasn't declared nor was it seen before during this execution. - // Thus, this must be a new, undeclared and unconsumed output. - // To prevent likely user errors, enforce the limit on the number of side - // outputs. - if (outputTags.size() >= MAX_SIDE_OUTPUTS) { - throw new IllegalArgumentException( - "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); - } - outputTags.add(tag); - } - - outputManager.output(tag, windowedElem); - if (stepContext != null) { - stepContext.noteSideOutput(tag, windowedElem); - } - } - - // Following implementations of output, outputWithTimestamp, and sideOutput - // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by - // ProcessContext's versions in DoFn.processElement. - @Override - public void output(OutputT output) { - outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); - sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); - sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, Combine.CombineFn combiner) { - checkNotNull(combiner, - "Combiner passed to createAggregator cannot be null"); - throw new UnsupportedOperationException("aggregator not supported in Gearpump runner"); - } - } - - - /** - * A concrete implementation of {@code DoFn.ProcessContext} used for - * running a {@link DoFn} over a single element. - * - * @param the type of the DoFn's (main) input elements - * @param the type of the DoFn's (main) output elements - */ - private static class DoFnProcessContext - extends OldDoFn.ProcessContext { - - - final OldDoFn fn; - final DoFnContext context; - final WindowedValue windowedValue; - - public DoFnProcessContext(OldDoFn fn, - DoFnContext context, - WindowedValue windowedValue) { - fn.super(); - this.fn = fn; - this.context = context; - this.windowedValue = windowedValue; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public InputT element() { - return windowedValue.getValue(); - } - - @Override - public T sideInput(PCollectionView view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - Iterator windowIter = windows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - if (context.windowFn instanceof GlobalWindows) { - // TODO: Remove this once GroupByKeyOnly no longer outputs elements - // without windows - window = GlobalWindow.INSTANCE; - } else { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - return context.sideInput(view, window); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof OldDoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindow."); - } - return Iterables.getOnlyElement(windows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public void output(OutputT output) { - context.outputWindowedValue(windowedValue.withValue(output)); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWindowedValue(output, timestamp, - windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - checkNotNull(tag, "Tag passed to sideOutput cannot be null"); - context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); - context.sideOutputWindowedValue( - tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - public Collection windows() { - return windowedValue.getWindows(); - } - - @Override - public WindowingInternals windowingInternals() { - return new WindowingInternals() { - @Override - public void outputWindowedValue(OutputT output, Instant timestamp, - Collection windows, PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public Collection windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public TimerInternals timerInternals() { - return context.stepContext.timerInternals(); - } - - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder elemCoder) throws IOException { - @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) context.windowFn.windowCoder(); - - context.stepContext.writePCollectionViewData( - tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), - window(), windowCoder); - } - - @Override - public StateInternals stateInternals() { - return context.stepContext.stateInternals(); - } - - @Override - public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { - return context.sideInput(view, mainInputWindow); - } - }; - } - - @Override - protected Aggregator - createAggregatorInternal( - String name, Combine.CombineFn combiner) { - return context.createAggregatorInternal(name, combiner); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java new file mode 100644 index 0000000..cd404a5 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import java.io.Serializable; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.util.ExecutionContext; + +/** + * no-op aggregator factory. + */ +public class NoOpAggregatorFactory implements AggregatorFactory, Serializable { + + @Override + public Aggregator createAggregatorForDoFn( + Class fnClass, + ExecutionContext.StepContext stepContext, + String aggregatorName, + Combine.CombineFn combine) { + return null; + } +}