Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9E00B200B29 for ; Thu, 16 Jun 2016 00:21:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9C6DB160A57; Wed, 15 Jun 2016 22:21:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 48328160A4D for ; Thu, 16 Jun 2016 00:21:24 +0200 (CEST) Received: (qmail 51190 invoked by uid 500); 15 Jun 2016 22:21:23 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 51181 invoked by uid 99); 15 Jun 2016 22:21:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jun 2016 22:21:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E3270C0338 for ; Wed, 15 Jun 2016 22:21:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.151 X-Spam-Level: X-Spam-Status: No, score=-4.151 tagged_above=-999 required=6.31 tests=[FUZZY_VPILL=0.494, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, WEIRD_QUOTING=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id HjCtt7xVIGZy for ; Wed, 15 Jun 2016 22:21:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 5F12F5F19D for ; Wed, 15 Jun 2016 22:21:07 +0000 (UTC) Received: (qmail 48074 invoked by uid 99); 15 Jun 2016 22:21:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jun 2016 22:21:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FD93E07FE; Wed, 15 Jun 2016 22:21:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Wed, 15 Jun 2016 22:21:07 -0000 Message-Id: <71f395306abf4203991ffd9e9704eabe@git.apache.org> In-Reply-To: <24fef56b9e564865bf5f465806cb4180@git.apache.org> References: <24fef56b9e564865bf5f465806cb4180@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] incubator-beam git commit: Remove the DirectPipelineRunner from the Core SDK archived-at: Wed, 15 Jun 2016 22:21:26 -0000 Remove the DirectPipelineRunner from the Core SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf476e12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf476e12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf476e12 Branch: refs/heads/master Commit: bf476e12beedd4598c388fae02b662b3d4794aaa Parents: b31be38 Author: Thomas Groh Authored: Tue Jun 14 17:52:49 2016 -0700 Committer: Thomas Groh Committed: Wed Jun 15 10:10:05 2016 -0700 ---------------------------------------------------------------------- .../examples/common/DataflowExampleUtils.java | 11 +- runners/spark/pom.xml | 6 + .../translation/TransformTranslatorTest.java | 4 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 1 - .../main/java/org/apache/beam/sdk/io/Read.java | 44 - .../java/org/apache/beam/sdk/io/TextIO.java | 1 - .../beam/sdk/options/DirectPipelineOptions.java | 1 - .../sdk/runners/DirectPipelineRegistrar.java | 55 - .../beam/sdk/runners/DirectPipelineRunner.java | 1298 ------------------ .../org/apache/beam/sdk/transforms/Flatten.java | 32 - .../org/apache/beam/sdk/transforms/ParDo.java | 302 +--- .../org/apache/beam/sdk/transforms/View.java | 24 - .../sdk/util/DirectModeExecutionContext.java | 130 -- .../apache/beam/sdk/util/DoFnRunnerBase.java | 1 - .../java/org/apache/beam/sdk/PipelineTest.java | 4 +- .../io/BoundedReadFromUnboundedSourceTest.java | 6 - .../runners/DirectPipelineRegistrarTest.java | 71 - .../sdk/runners/DirectPipelineRunnerTest.java | 222 --- .../beam/sdk/runners/PipelineRunnerTest.java | 9 +- .../apache/beam/sdk/transforms/CombineTest.java | 21 - .../beam/sdk/transforms/GroupByKeyTest.java | 13 +- .../apache/beam/sdk/transforms/ViewTest.java | 29 +- .../main/java/common/DataflowExampleUtils.java | 13 +- testing/travis/test_wordcount.sh | 4 +- 24 files changed, 40 insertions(+), 2262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java index fb4f3bf..a0b7319 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java @@ -17,6 +17,7 @@ */ package org.apache.beam.examples.common; +import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -25,7 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.IntraBundleParallelization; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; @@ -315,11 +316,13 @@ public class DataflowExampleUtils { /** * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. + * streaming, and if streaming is specified, use the DataflowPipelineRunner. */ public void setupRunner() { - if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) { + Class> runner = options.getRunner(); + if (options.isStreaming() + && (runner.equals(DataflowPipelineRunner.class) + || runner.equals(BlockingDataflowPipelineRunner.class))) { // In order to cancel the pipelines automatically, // {@literal DataflowPipelineRunner} is forced to be used. options.setRunner(DataflowPipelineRunner.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 4110689..e7d0834 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -118,6 +118,12 @@ hamcrest-all test + + org.apache.beam + beam-runners-direct-java + 0.2.0-incubating-SNAPSHOT + test + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index 4ef26d3..01f3070 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -21,12 +21,12 @@ package org.apache.beam.runners.spark.translation; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; +import org.apache.beam.runners.direct.InProcessPipelineRunner; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; @@ -58,7 +58,7 @@ public class TransformTranslatorTest { */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline(DirectPipelineRunner.class); + String directOut = runPipeline(InProcessPipelineRunner.class); String sparkOut = runPipeline(SparkPipelineRunner.class); List directOutput = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 7e24253..2a5698c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index fb40063..c0440f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -20,11 +20,9 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -32,9 +30,6 @@ import org.apache.beam.sdk.values.PInput; import org.joda.time.Duration; -import java.util.ArrayList; -import java.util.List; - import javax.annotation.Nullable; /** @@ -153,45 +148,6 @@ public class Read { .withLabel("Read Source")) .include(source); } - - static { - registerDefaultTransformEvaluator(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private static void registerDefaultTransformEvaluator() { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bounded.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bounded transform, DirectPipelineRunner.EvaluationContext context) { - evaluateReadHelper(transform, context); - } - - private void evaluateReadHelper( - Read.Bounded transform, DirectPipelineRunner.EvaluationContext context) { - try { - List> output = new ArrayList<>(); - BoundedSource source = transform.getSource(); - try (BoundedSource.BoundedReader reader = - source.createReader(context.getPipelineOptions())) { - for (boolean available = reader.start(); - available; - available = reader.advance()) { - output.add( - DirectPipelineRunner.ValueWithMetadata.of( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp()))); - } - } - context.setPCollectionValuesWithMetadata(context.getOutput(transform), output); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 13cb45e..bbef072 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java index 4cdc0ca..c2095e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.values.PCollection; import com.fasterxml.jackson.annotation.JsonIgnore; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java deleted file mode 100644 index 7dd0fdd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -/** - * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for - * the {@link DirectPipeline}. - */ -public class DirectPipelineRegistrar { - private DirectPipelineRegistrar() { } - - /** - * Register the {@link DirectPipelineRunner}. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - @Override - public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(DirectPipelineRunner.class); - } - } - - /** - * Register the {@link DirectPipelineOptions}. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(DirectPipelineOptions.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java deleted file mode 100644 index 1eb25c5..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java +++ /dev/null @@ -1,1298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.transforms.Partition.PartitionFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.AssignWindows; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.MapAggregatorValues; -import org.apache.beam.sdk.util.PerKeyCombineFnRunner; -import org.apache.beam.sdk.util.PerKeyCombineFnRunners; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TypedPValue; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -/** - * Executes the operations in the pipeline directly, in this process, without - * any optimization. Useful for small local execution and tests. - * - *

Throws an exception from {@link #run} if execution fails. - * - *

Permissions

- * When reading from a Dataflow source or writing to a Dataflow sink using - * {@code DirectPipelineRunner}, the Cloud Platform account that you configured with the - * gcloud executable will need access to the - * corresponding source/sink. - * - *

Please see Google Cloud - * Dataflow Security and Permissions for more details. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class DirectPipelineRunner - extends PipelineRunner { - private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class); - - /** - * A source of random data, which can be seeded if determinism is desired. - */ - private Random rand; - - /** - * A map from PTransform class to the corresponding - * TransformEvaluator to use to evaluate that transform. - * - *

A static map that contains system-wide defaults. - */ - private static Map defaultTransformEvaluators = - new HashMap<>(); - - /** - * A map from PTransform class to the corresponding - * TransformEvaluator to use to evaluate that transform. - * - *

An instance map that contains bindings for this DirectPipelineRunner. - * Bindings in this map override those in the default map. - */ - private Map localTransformEvaluators = - new HashMap<>(); - - /** - * Records that instances of the specified PTransform class - * should be evaluated by default by the corresponding - * TransformEvaluator. - */ - public static > - void registerDefaultTransformEvaluator( - Class transformClass, - TransformEvaluator transformEvaluator) { - if (defaultTransformEvaluators.put(transformClass, transformEvaluator) - != null) { - throw new IllegalArgumentException( - "defining multiple evaluators for " + transformClass); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Records that instances of the specified PTransform class - * should be evaluated by the corresponding TransformEvaluator. - * Overrides any bindings specified by - * {@link #registerDefaultTransformEvaluator}. - */ - public > - void registerTransformEvaluator( - Class transformClass, - TransformEvaluator transformEvaluator) { - if (localTransformEvaluators.put(transformClass, transformEvaluator) - != null) { - throw new IllegalArgumentException( - "defining multiple evaluators for " + transformClass); - } - } - - /** - * Returns the TransformEvaluator to use for instances of the - * specified PTransform class, or null if none registered. - */ - public > - TransformEvaluator getTransformEvaluator(Class transformClass) { - TransformEvaluator transformEvaluator = - localTransformEvaluators.get(transformClass); - if (transformEvaluator == null) { - transformEvaluator = defaultTransformEvaluators.get(transformClass); - } - return transformEvaluator; - } - - /** - * Constructs a DirectPipelineRunner from the given options. - */ - public static DirectPipelineRunner fromOptions(PipelineOptions options) { - DirectPipelineOptions directOptions = - PipelineOptionsValidator.validate(DirectPipelineOptions.class, options); - LOG.debug("Creating DirectPipelineRunner"); - return new DirectPipelineRunner(directOptions); - } - - /** - * Enable runtime testing to verify that all functions and {@link Coder} - * instances can be serialized. - * - *

Enabled by default. - * - *

This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withSerializabilityTesting(boolean enable) { - this.testSerializability = enable; - return this; - } - - /** - * Enable runtime testing to verify that all values can be encoded. - * - *

Enabled by default. - * - *

This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withEncodabilityTesting(boolean enable) { - this.testEncodability = enable; - return this; - } - - /** - * Enable runtime testing to verify that functions do not depend on order - * of the elements. - * - *

This is accomplished by randomizing the order of elements. - * - *

Enabled by default. - * - *

This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withUnorderednessTesting(boolean enable) { - this.testUnorderedness = enable; - return this; - } - - @Override - public OutputT apply( - PTransform transform, InputT input) { - if (transform instanceof Combine.GroupedValues) { - return (OutputT) applyTestCombine((Combine.GroupedValues) transform, (PCollection) input); - } else if (transform instanceof TextIO.Write.Bound) { - return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection) input); - } else if (transform instanceof AvroIO.Write.Bound) { - return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection) input); - } else if (transform instanceof GroupByKey) { - return (OutputT) - ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); - } else if (transform instanceof Window.Bound) { - return (OutputT) - ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform)); - } else { - return super.apply(transform, input); - } - } - - private PCollection> applyTestCombine( - Combine.GroupedValues transform, - PCollection>> input) { - - PCollection> output = input - .apply(ParDo.of(TestCombineDoFn.create(transform, input, testSerializability, rand)) - .withSideInputs(transform.getSideInputs())); - - try { - output.setCoder(transform.getDefaultOutputCoder(input)); - } catch (CannotProvideCoderException exc) { - // let coder inference occur later, if it can - } - return output; - } - - private static class ElementProcessingOrderPartitionFn implements PartitionFn { - private int elementNumber; - @Override - public int partitionFor(T elem, int numPartitions) { - return elementNumber++ % numPartitions; - } - } - - /** - * Applies TextIO.Write honoring user requested sharding controls (i.e. withNumShards) - * by applying a partition function based upon the number of shards the user requested. - */ - private static class DirectTextIOWrite extends PTransform, PDone> { - private final TextIO.Write.Bound transform; - - private DirectTextIOWrite(TextIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - checkState(transform.getNumShards() > 1, - "DirectTextIOWrite is expected to only be used when sharding controls are required."); - - // Evenly distribute all the elements across the partitions. - PCollectionList partitionedElements = - input.apply(Partition.of(transform.getNumShards(), - new ElementProcessingOrderPartitionFn())); - - // For each input PCollection partition, create a write transform that represents - // one of the specific shards. - for (int i = 0; i < transform.getNumShards(); ++i) { - /* - * This logic mirrors the file naming strategy within - * {@link FileBasedSink#generateDestinationFilenames()} - */ - String outputFilename = IOChannelUtils.constructName( - transform.getFilenamePrefix(), - transform.getShardNameTemplate(), - getFileExtension(transform.getFilenameSuffix()), - i, - transform.getNumShards()); - - String transformName = String.format("%s(Shard:%s)", transform.getName(), i); - partitionedElements.get(i).apply(transformName, - transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename)); - } - return PDone.in(input.getPipeline()); - } - } - - /** - * Returns the file extension to be used. If the user did not request a file - * extension then this method returns the empty string. Otherwise this method - * adds a {@code "."} to the beginning of the users extension if one is not present. - * - *

This is copied from {@link FileBasedSink} to not expose it. - */ - private static String getFileExtension(String usersExtension) { - if (usersExtension == null || usersExtension.isEmpty()) { - return ""; - } - if (usersExtension.startsWith(".")) { - return usersExtension; - } - return "." + usersExtension; - } - - /** - * Apply the override for TextIO.Write.Bound if the user requested sharding controls - * greater than one. - */ - private PDone applyTextIOWrite(TextIO.Write.Bound transform, PCollection input) { - if (transform.getNumShards() <= 1) { - // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never - // requested sharding controls greater than 1, we default to outputting to 1 file. - return super.apply(transform.withNumShards(1), input); - } - return input.apply(new DirectTextIOWrite<>(transform)); - } - - /** - * Applies AvroIO.Write honoring user requested sharding controls (i.e. withNumShards) - * by applying a partition function based upon the number of shards the user requested. - */ - private static class DirectAvroIOWrite extends PTransform, PDone> { - private final AvroIO.Write.Bound transform; - - private DirectAvroIOWrite(AvroIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - checkState(transform.getNumShards() > 1, - "DirectAvroIOWrite is expected to only be used when sharding controls are required."); - - // Evenly distribute all the elements across the partitions. - PCollectionList partitionedElements = - input.apply(Partition.of(transform.getNumShards(), - new ElementProcessingOrderPartitionFn())); - - // For each input PCollection partition, create a write transform that represents - // one of the specific shards. - for (int i = 0; i < transform.getNumShards(); ++i) { - /* - * This logic mirrors the file naming strategy within - * {@link FileBasedSink#generateDestinationFilenames()} - */ - String outputFilename = IOChannelUtils.constructName( - transform.getFilenamePrefix(), - transform.getShardNameTemplate(), - getFileExtension(transform.getFilenameSuffix()), - i, - transform.getNumShards()); - - String transformName = String.format("%s(Shard:%s)", transform.getName(), i); - partitionedElements.get(i).apply(transformName, - transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename)); - } - return PDone.in(input.getPipeline()); - } - } - - private static class AssignWindowsAndSetStrategy - extends PTransform, PCollection> { - - private final Window.Bound wrapped; - - public AssignWindowsAndSetStrategy(Window.Bound wrapped) { - this.wrapped = wrapped; - } - - @Override - public PCollection apply(PCollection input) { - WindowingStrategy outputStrategy = - wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); - - WindowFn windowFn = - (WindowFn) outputStrategy.getWindowFn(); - - // If the Window.Bound transform only changed parts other than the WindowFn, then - // we skip AssignWindows even though it should be harmless in a perfect world. - // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly - // crash if another GBK is performed without explicitly setting the WindowFn. So we skip - // AssignWindows in this case. - if (wrapped.getWindowFn() == null) { - return input.apply("Identity", ParDo.of(new IdentityFn())) - .setWindowingStrategyInternal(outputStrategy); - } else { - return input - .apply("AssignWindows", new AssignWindows(windowFn)) - .setWindowingStrategyInternal(outputStrategy); - } - } - } - - private static class IdentityFn extends DoFn { - @Override - public void processElement(ProcessContext c) { - c.output(c.element()); - } - } - - /** - * Apply the override for AvroIO.Write.Bound if the user requested sharding controls - * greater than one. - */ - private PDone applyAvroIOWrite(AvroIO.Write.Bound transform, PCollection input) { - if (transform.getNumShards() <= 1) { - // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never - // requested sharding controls greater than 1, we default to outputting to 1 file. - return super.apply(transform.withNumShards(1), input); - } - return input.apply(new DirectAvroIOWrite<>(transform)); - } - - /** - * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases ( - * see {@code org.apache.beam.sdk.runners.worker.CombineValuesFn}). In order to emulate - * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go - * through heavy serializability checks for the equivalent of the results of the ADD phase, but - * after the {@link org.apache.beam.sdk.transforms.GroupByKey} shuffle, and the MERGE - * phase. Doing these checks ensure that not only is the accumulator coder serializable, but - * the accumulator coder can actually serialize the data in question. - */ - public static class TestCombineDoFn - extends DoFn>, KV> { - private final PerKeyCombineFnRunner fnRunner; - private final Coder accumCoder; - private final boolean testSerializability; - private final Random rand; - - public static TestCombineDoFn create( - Combine.GroupedValues transform, - PCollection>> input, - boolean testSerializability, - Random rand) { - - AppliedCombineFn fn = transform.getAppliedFn( - input.getPipeline().getCoderRegistry(), input.getCoder(), input.getWindowingStrategy()); - - return new TestCombineDoFn( - PerKeyCombineFnRunners.create(fn.getFn()), - fn.getAccumulatorCoder(), - testSerializability, - rand); - } - - public TestCombineDoFn( - PerKeyCombineFnRunner fnRunner, - Coder accumCoder, - boolean testSerializability, - Random rand) { - this.fnRunner = fnRunner; - this.accumCoder = accumCoder; - this.testSerializability = testSerializability; - this.rand = rand; - - // Check that this does not crash, specifically to catch anonymous CustomCoder subclasses. - this.accumCoder.getEncodingId(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - K key = c.element().getKey(); - Iterable values = c.element().getValue(); - List groupedPostShuffle = - ensureSerializableByCoder(ListCoder.of(accumCoder), - addInputsRandomly(fnRunner, key, values, rand, c), - "After addInputs of KeyedCombineFn " + fnRunner.fn().toString()); - AccumT merged = - ensureSerializableByCoder(accumCoder, - fnRunner.mergeAccumulators(key, groupedPostShuffle, c), - "After mergeAccumulators of KeyedCombineFn " + fnRunner.fn().toString()); - // Note: The serializability of KV is ensured by the - // runner itself, since it's a transform output. - c.output(KV.of(key, fnRunner.extractOutput(key, merged, c))); - } - - /** - * Create a random list of accumulators from the given list of values. - * - *

Visible for testing purposes only. - */ - public static List addInputsRandomly( - PerKeyCombineFnRunner fnRunner, - K key, - Iterable values, - Random random, - DoFn.ProcessContext c) { - List out = new ArrayList(); - int i = 0; - AccumT accumulator = fnRunner.createAccumulator(key, c); - boolean hasInput = false; - - for (InputT value : values) { - accumulator = fnRunner.addInput(key, accumulator, value, c); - hasInput = true; - - // For each index i, flip a 1/2^i weighted coin for whether to - // create a new accumulator after index i is added, i.e. [0] - // is guaranteed, [1] is an even 1/2, [2] is 1/4, etc. The - // goal is to partition the inputs into accumulators, and make - // the accumulators potentially lumpy. Also compact about half - // of the accumulators. - if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) { - if (i % 2 == 0) { - accumulator = fnRunner.compact(key, accumulator, c); - } - out.add(accumulator); - accumulator = fnRunner.createAccumulator(key, c); - hasInput = false; - } - i++; - } - if (hasInput) { - out.add(accumulator); - } - - Collections.shuffle(out, random); - return out; - } - - public T ensureSerializableByCoder( - Coder coder, T value, String errorContext) { - if (testSerializability) { - return SerializableUtils.ensureSerializableByCoder( - coder, value, errorContext); - } - return value; - } - } - - @Override - public EvaluationResults run(Pipeline pipeline) { - LOG.info("Executing pipeline using the DirectPipelineRunner."); - - Evaluator evaluator = new Evaluator(rand); - evaluator.run(pipeline); - - // Log all counter values for debugging purposes. - for (Counter counter : evaluator.getCounters()) { - LOG.info("Final aggregator value: {}", counter); - } - - LOG.info("Pipeline execution complete."); - - return evaluator; - } - - /** - * An evaluator of a PTransform. - */ - public interface TransformEvaluator { - public void evaluate(TransformT transform, - EvaluationContext context); - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@code DirectPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and {@link PCollectionView}s. - */ - public interface EvaluationResults extends PipelineResult { - /** - * Retrieves the value of the given PCollection. - * Throws an exception if the PCollection's value hasn't already been set. - */ - List getPCollection(PCollection pc); - - /** - * Retrieves the windowed value of the given PCollection. - * Throws an exception if the PCollection's value hasn't already been set. - */ - List> getPCollectionWindowedValues(PCollection pc); - - /** - * Retrieves the values of each PCollection in the given - * PCollectionList. Throws an exception if the PCollectionList's - * value hasn't already been set. - */ - List> getPCollectionList(PCollectionList pcs); - - /** - * Retrieves the values indicated by the given {@link PCollectionView}. - * Note that within the {@link org.apache.beam.sdk.transforms.DoFn.Context} - * implementation a {@link PCollectionView} should convert from this representation to a - * suitable side input value. - */ - Iterable> getPCollectionView(PCollectionView view); - } - - /** - * An immutable (value, timestamp) pair, along with other metadata necessary - * for the implementation of {@code DirectPipelineRunner}. - */ - public static class ValueWithMetadata { - /** - * Returns a new {@code ValueWithMetadata} with the {@code WindowedValue}. - * Key is null. - */ - public static ValueWithMetadata of(WindowedValue windowedValue) { - return new ValueWithMetadata<>(windowedValue, null); - } - - /** - * Returns a new {@code ValueWithMetadata} with the implicit key associated - * with this value set. The key is the last key grouped by in the chain of - * productions that produced this element. - * These keys are used internally by {@link DirectPipelineRunner} for keeping - * persisted state separate across keys. - */ - public ValueWithMetadata withKey(Object key) { - return new ValueWithMetadata<>(windowedValue, key); - } - - /** - * Returns a new {@code ValueWithMetadata} that is a copy of this one, but with - * a different value. - */ - public ValueWithMetadata withValue(T value) { - return new ValueWithMetadata(windowedValue.withValue(value), getKey()); - } - - /** - * Returns the {@code WindowedValue} associated with this element. - */ - public WindowedValue getWindowedValue() { - return windowedValue; - } - - /** - * Returns the value associated with this element. - * - * @see #withValue - */ - public V getValue() { - return windowedValue.getValue(); - } - - /** - * Returns the timestamp associated with this element. - */ - public Instant getTimestamp() { - return windowedValue.getTimestamp(); - } - - /** - * Returns the collection of windows this element has been placed into. May - * be null if the {@code PCollection} this element is in has not yet been - * windowed. - * - * @see #getWindows() - */ - public Collection getWindows() { - return windowedValue.getWindows(); - } - - - /** - * Returns the key associated with this element. May be null if the - * {@code PCollection} this element is in is not keyed. - * - * @see #withKey - */ - public Object getKey() { - return key; - } - - //////////////////////////////////////////////////////////////////////////// - - private final Object key; - private final WindowedValue windowedValue; - - private ValueWithMetadata(WindowedValue windowedValue, - Object key) { - this.windowedValue = windowedValue; - this.key = key; - } - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@code DirectPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and {@link PCollectionView}s. - */ - public interface EvaluationContext extends EvaluationResults { - /** - * Returns the configured pipeline options. - */ - DirectPipelineOptions getPipelineOptions(); - - /** - * Returns the input of the currently being processed transform. - */ - InputT getInput(PTransform transform); - - /** - * Returns the output of the currently being processed transform. - */ - OutputT getOutput(PTransform transform); - - /** - * Sets the value of the given PCollection, where each element also has a timestamp - * and collection of windows. - * Throws an exception if the PCollection's value has already been set. - */ - void setPCollectionValuesWithMetadata( - PCollection pc, List> elements); - - /** - * Sets the value of the given PCollection, where each element also has a timestamp - * and collection of windows. - * Throws an exception if the PCollection's value has already been set. - */ - void setPCollectionWindowedValue(PCollection pc, List> elements); - - /** - * Shorthand for setting the value of a PCollection where the elements do not have - * timestamps or windows. - * Throws an exception if the PCollection's value has already been set. - */ - void setPCollection(PCollection pc, List elements); - - /** - * Retrieves the value of the given PCollection, along with element metadata - * such as timestamps and windows. - * Throws an exception if the PCollection's value hasn't already been set. - */ - List> getPCollectionValuesWithMetadata(PCollection pc); - - /** - * Sets the value associated with the given {@link PCollectionView}. - * Throws an exception if the {@link PCollectionView}'s value has already been set. - */ - void setPCollectionView( - PCollectionView pc, - Iterable> value); - - /** - * Ensures that the element is encodable and decodable using the - * TypePValue's coder, by encoding it and decoding it, and - * returning the result. - */ - T ensureElementEncodable(TypedPValue pvalue, T element); - - /** - * If the evaluation context is testing unorderedness, - * randomly permutes the order of the elements, in a - * copy if !inPlaceAllowed, and returns the permuted list, - * otherwise returns the argument unchanged. - */ - List randomizeIfUnordered(List elements, - boolean inPlaceAllowed); - - /** - * If the evaluation context is testing serializability, ensures - * that the argument function is serializable and deserializable - * by encoding it and then decoding it, and returning the result. - * Otherwise returns the argument unchanged. - */ - FunctionT ensureSerializable(FunctionT fn); - - /** - * If the evaluation context is testing serializability, ensures - * that the argument Coder is serializable and deserializable - * by encoding it and then decoding it, and returning the result. - * Otherwise returns the argument unchanged. - */ - Coder ensureCoderSerializable(Coder coder); - - /** - * If the evaluation context is testing serializability, ensures - * that the given data is serializable and deserializable with the - * given Coder by encoding it and then decoding it, and returning - * the result. Otherwise returns the argument unchanged. - * - *

Error context is prefixed to any thrown exceptions. - */ - T ensureSerializableByCoder(Coder coder, - T data, String errorContext); - - /** - * Returns a mutator, which can be used to add additional counters to - * this EvaluationContext. - */ - CounterSet.AddCounterMutator getAddCounterMutator(); - - /** - * Gets the step name for this transform. - */ - public String getStepName(PTransform transform); - } - - - ///////////////////////////////////////////////////////////////////////////// - - class Evaluator extends PipelineVisitor.Defaults implements EvaluationContext { - /** - * A map from PTransform to the step name of that transform. This is the internal name for the - * transform (e.g. "s2"). - */ - private final Map, String> stepNames = new HashMap<>(); - private final Map store = new HashMap<>(); - private final CounterSet counters = new CounterSet(); - private AppliedPTransform currentTransform; - - private Map, Collection>> aggregatorSteps = null; - - /** - * A map from PTransform to the full name of that transform. This is the user name of the - * transform (e.g. "RemoveDuplicates/Combine/GroupByKey"). - */ - private final Map, String> fullNames = new HashMap<>(); - - private Random rand; - - public Evaluator() { - this(new Random()); - } - - public Evaluator(Random rand) { - this.rand = rand; - } - - public void run(Pipeline pipeline) { - pipeline.traverseTopologically(this); - aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); - } - - @Override - public DirectPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public InputT getInput(PTransform transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return (InputT) currentTransform.getInput(); - } - - @Override - public OutputT getOutput(PTransform transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return (OutputT) currentTransform.getOutput(); - } - - @Override - public void visitPrimitiveTransform(TransformTreeNode node) { - PTransform transform = node.getTransform(); - fullNames.put(transform, node.getFullName()); - TransformEvaluator evaluator = - getTransformEvaluator(transform.getClass()); - if (evaluator == null) { - throw new IllegalStateException( - "no evaluator registered for " + transform); - } - LOG.debug("Evaluating {}", transform); - currentTransform = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform); - evaluator.evaluate(transform, this); - currentTransform = null; - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - LOG.debug("Checking evaluation of {}", value); - if (value.getProducingTransformInternal() == null) { - throw new RuntimeException( - "internal error: expecting a PValue to have a producingTransform"); - } - if (!producer.isCompositeNode()) { - // Verify that primitive transform outputs are already computed. - getPValue(value); - } - } - - /** - * Sets the value of the given PValue. - * Throws an exception if the PValue's value has already been set. - */ - void setPValue(PValue pvalue, Object contents) { - if (store.containsKey(pvalue)) { - throw new IllegalStateException( - "internal error: setting the value of " + pvalue - + " more than once"); - } - store.put(pvalue, contents); - } - - /** - * Retrieves the value of the given PValue. - * Throws an exception if the PValue's value hasn't already been set. - */ - Object getPValue(PValue pvalue) { - if (!store.containsKey(pvalue)) { - throw new IllegalStateException( - "internal error: getting the value of " + pvalue - + " before it has been computed"); - } - return store.get(pvalue); - } - - /** - * Convert a list of T to a list of {@code ValueWithMetadata}, with a timestamp of 0 - * and null windows. - */ - List> toValueWithMetadata(List values) { - List> result = new ArrayList<>(values.size()); - for (T value : values) { - result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value))); - } - return result; - } - - /** - * Convert a list of {@code WindowedValue} to a list of {@code ValueWithMetadata}. - */ - List> toValueWithMetadataFromWindowedValue( - List> values) { - List> result = new ArrayList<>(values.size()); - for (WindowedValue value : values) { - result.add(ValueWithMetadata.of(value)); - } - return result; - } - - @Override - public void setPCollection(PCollection pc, List elements) { - setPCollectionValuesWithMetadata(pc, toValueWithMetadata(elements)); - } - - @Override - public void setPCollectionWindowedValue( - PCollection pc, List> elements) { - setPCollectionValuesWithMetadata(pc, toValueWithMetadataFromWindowedValue(elements)); - } - - @Override - public void setPCollectionValuesWithMetadata( - PCollection pc, List> elements) { - LOG.debug("Setting {} = {}", pc, elements); - ensurePCollectionEncodable(pc, elements); - setPValue(pc, elements); - } - - @Override - public void setPCollectionView( - PCollectionView view, - Iterable> value) { - LOG.debug("Setting {} = {}", view, value); - setPValue(view, value); - } - - /** - * Retrieves the value of the given {@link PCollection}. - * Throws an exception if the {@link PCollection}'s value hasn't already been set. - */ - @Override - public List getPCollection(PCollection pc) { - List result = new ArrayList<>(); - for (ValueWithMetadata elem : getPCollectionValuesWithMetadata(pc)) { - result.add(elem.getValue()); - } - return result; - } - - @Override - public List> getPCollectionWindowedValues(PCollection pc) { - return Lists.transform( - getPCollectionValuesWithMetadata(pc), - new Function, WindowedValue>() { - @Override - public WindowedValue apply(ValueWithMetadata input) { - return input.getWindowedValue(); - }}); - } - - @Override - public List> getPCollectionValuesWithMetadata(PCollection pc) { - List> elements = (List>) getPValue(pc); - elements = randomizeIfUnordered(elements, false /* not inPlaceAllowed */); - LOG.debug("Getting {} = {}", pc, elements); - return elements; - } - - @Override - public List> getPCollectionList(PCollectionList pcs) { - List> elementsList = new ArrayList<>(); - for (PCollection pc : pcs.getAll()) { - elementsList.add(getPCollection(pc)); - } - return elementsList; - } - - /** - * Retrieves the value indicated by the given {@link PCollectionView}. - * Note that within the {@link DoFnContext} a {@link PCollectionView} - * converts from this representation to a suitable side input value. - */ - @Override - public Iterable> getPCollectionView(PCollectionView view) { - Iterable> value = (Iterable>) getPValue(view); - LOG.debug("Getting {} = {}", view, value); - return value; - } - - /** - * If {@code testEncodability}, ensures that the {@link PCollection}'s coder and elements are - * encodable and decodable by encoding them and decoding them, and returning the result. - * Otherwise returns the argument elements. - */ - List> ensurePCollectionEncodable( - PCollection pc, List> elements) { - ensureCoderSerializable(pc.getCoder()); - if (!testEncodability) { - return elements; - } - List> elementsCopy = new ArrayList<>(elements.size()); - for (ValueWithMetadata element : elements) { - elementsCopy.add( - element.withValue(ensureElementEncodable(pc, element.getValue()))); - } - return elementsCopy; - } - - @Override - public T ensureElementEncodable(TypedPValue pvalue, T element) { - return ensureSerializableByCoder( - pvalue.getCoder(), element, "Within " + pvalue.toString()); - } - - @Override - public List randomizeIfUnordered(List elements, - boolean inPlaceAllowed) { - if (!testUnorderedness) { - return elements; - } - List elementsCopy = new ArrayList<>(elements); - Collections.shuffle(elementsCopy, rand); - return elementsCopy; - } - - @Override - public FunctionT ensureSerializable(FunctionT fn) { - if (!testSerializability) { - return fn; - } - return SerializableUtils.ensureSerializable(fn); - } - - @Override - public Coder ensureCoderSerializable(Coder coder) { - if (testSerializability) { - SerializableUtils.ensureSerializable(coder); - } - return coder; - } - - @Override - public T ensureSerializableByCoder( - Coder coder, T value, String errorContext) { - if (testSerializability) { - return SerializableUtils.ensureSerializableByCoder( - coder, value, errorContext); - } - return value; - } - - @Override - public CounterSet.AddCounterMutator getAddCounterMutator() { - return counters.getAddCounterMutator(); - } - - @Override - public String getStepName(PTransform transform) { - String stepName = stepNames.get(transform); - if (stepName == null) { - stepName = "s" + (stepNames.size() + 1); - stepNames.put(transform, stepName); - } - return stepName; - } - - /** - * Returns the CounterSet generated during evaluation, which includes - * user-defined Aggregators and may include system-defined counters. - */ - public CounterSet getCounters() { - return counters; - } - - /** - * Returns JobState.DONE in all situations. The Evaluator is not returned - * until the pipeline has been traversed, so it will either be returned - * after a successful run or the run call will terminate abnormally. - */ - @Override - public State getState() { - return State.DONE; - } - - @Override - public AggregatorValues getAggregatorValues(Aggregator aggregator) { - Map stepValues = new HashMap<>(); - for (PTransform step : aggregatorSteps.get(aggregator)) { - String stepName = String.format("user-%s-%s", stepNames.get(step), aggregator.getName()); - String fullName = fullNames.get(step); - Counter counter = counters.getExistingCounter(stepName); - if (counter == null) { - throw new IllegalArgumentException( - "Aggregator " + aggregator + " is not used in this pipeline"); - } - stepValues.put(fullName, (T) counter.getAggregate()); - } - return new MapAggregatorValues<>(stepValues); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * The key by which GBK groups inputs - elements are grouped by the encoded form of the key, - * but the original key may be accessed as well. - */ - private static class GroupingKey { - private K key; - private byte[] encodedKey; - - public GroupingKey(K key, byte[] encodedKey) { - this.key = key; - this.encodedKey = encodedKey; - } - - public K getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (o instanceof GroupingKey) { - GroupingKey that = (GroupingKey) o; - return Arrays.equals(this.encodedKey, that.encodedKey); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(encodedKey); - } - } - - private final DirectPipelineOptions options; - private boolean testSerializability; - private boolean testEncodability; - private boolean testUnorderedness; - - /** Returns a new DirectPipelineRunner. */ - private DirectPipelineRunner(DirectPipelineOptions options) { - this.options = options; - // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerStandardIOFactories(options); - long randomSeed; - if (options.getDirectPipelineRunnerRandomSeed() != null) { - randomSeed = options.getDirectPipelineRunnerRandomSeed(); - } else { - randomSeed = new Random().nextLong(); - } - - LOG.debug("DirectPipelineRunner using random seed {}.", randomSeed); - rand = new Random(randomSeed); - - testSerializability = options.isTestSerializability(); - testEncodability = options.isTestEncodability(); - testUnorderedness = options.isTestUnorderedness(); - } - - /** - * Get the options used in this {@link Pipeline}. - */ - public DirectPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public String toString() { - return "DirectPipelineRunner#" + hashCode(); - } - - public static void evaluateGroupByKeyOnly( - GroupByKeyOnly transform, - EvaluationContext context) { - PCollection> input = context.getInput(transform); - - List>> inputElems = - context.getPCollectionValuesWithMetadata(input); - - Coder keyCoder = GroupByKey.getKeyCoder(input.getCoder()); - - Map, List> groupingMap = new HashMap<>(); - - for (ValueWithMetadata> elem : inputElems) { - K key = elem.getValue().getKey(); - V value = elem.getValue().getValue(); - byte[] encodedKey; - try { - encodedKey = encodeToByteArray(keyCoder, key); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - "unable to encode key " + key + " of input to " + transform - + " using " + keyCoder, - exn); - } - GroupingKey groupingKey = - new GroupingKey<>(key, encodedKey); - List values = groupingMap.get(groupingKey); - if (values == null) { - values = new ArrayList(); - groupingMap.put(groupingKey, values); - } - values.add(value); - } - - List>>> outputElems = - new ArrayList<>(); - for (Map.Entry, List> entry : groupingMap.entrySet()) { - GroupingKey groupingKey = entry.getKey(); - K key = groupingKey.getKey(); - List values = entry.getValue(); - values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); - outputElems.add(ValueWithMetadata - .of(WindowedValue.valueInEmptyWindows(KV.>of(key, values))) - .withKey(key)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), - outputElems); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - public - static void registerGroupByKeyOnly() { - registerDefaultTransformEvaluator( - GroupByKeyOnly.class, - new TransformEvaluator() { - @Override - public void evaluate( - GroupByKeyOnly transform, - EvaluationContext context) { - evaluateGroupByKeyOnly(transform, context); - } - }); - } - - static { - registerGroupByKeyOnly(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 7c6fed3..93917f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -20,16 +20,12 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableLikeCoder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; -import java.util.ArrayList; -import java.util.List; - /** * {@code Flatten} takes multiple {@code PCollection}s bundled * into a {@code PCollectionList} and returns a single @@ -189,32 +185,4 @@ public class Flatten { .setCoder(elemCoder); } } - - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - FlattenPCollectionList.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - FlattenPCollectionList transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateHelper(transform, context); - } - }); - } - - private static void evaluateHelper( - FlattenPCollectionList transform, - DirectPipelineRunner.EvaluationContext context) { - List> outputElems = new ArrayList<>(); - PCollectionList inputs = context.getInput(transform); - - for (PCollection input : inputs.getAll()) { - outputElems.addAll(context.getPCollectionValuesWithMetadata(input)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 511f0d8..cb7d372 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -21,27 +21,12 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.DirectModeExecutionContext; -import org.apache.beam.sdk.util.DirectSideInputReader; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunnerBase; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.MutationDetector; -import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.PTuple; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.StringUtils; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -50,16 +35,10 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypedPValue; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; import java.io.Serializable; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; /** * {@link ParDo} is the core element-wise transform in Google Cloud @@ -84,7 +63,7 @@ import javax.annotation.Nullable; *

Conceptually, when a {@link ParDo} transform is executed, the * elements of the input {@link PCollection} are first divided up * into some number of "bundles". These are farmed off to distributed - * worker machines (or run locally, if using the {@link DirectPipelineRunner}). + * worker machines (or run locally, if using the {@code DirectRunner}). * For each bundle of input elements processing proceeds as follows: * *

    @@ -1072,288 +1051,11 @@ public class ParDo { } } - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bound transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateSingleHelper(transform, context); - } - }); - } - - private static void evaluateSingleHelper( - Bound transform, - DirectPipelineRunner.EvaluationContext context) { - TupleTag mainOutputTag = new TupleTag<>("out"); - - DirectModeExecutionContext executionContext = DirectModeExecutionContext.create(); - - PCollectionTuple outputs = PCollectionTuple.of(mainOutputTag, context.getOutput(transform)); - - evaluateHelper( - transform.fn, - context.getStepName(transform), - context.getInput(transform), - transform.sideInputs, - mainOutputTag, - Collections.>emptyList(), - outputs, - context, - executionContext); - - context.setPCollectionValuesWithMetadata( - context.getOutput(transform), - executionContext.getOutput(mainOutputTag)); - } - - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - BoundMulti.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - BoundMulti transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateMultiHelper(transform, context); - } - }); - } - - private static void evaluateMultiHelper( - BoundMulti transform, - DirectPipelineRunner.EvaluationContext context) { - - DirectModeExecutionContext executionContext = DirectModeExecutionContext.create(); - - evaluateHelper( - transform.fn, - context.getStepName(transform), - context.getInput(transform), - transform.sideInputs, - transform.mainOutputTag, - transform.sideOutputTags.getAll(), - context.getOutput(transform), - context, - executionContext); - - for (Map.Entry, PCollection> entry - : context.getOutput(transform).getAll().entrySet()) { - @SuppressWarnings("unchecked") - TupleTag tag = (TupleTag) entry.getKey(); - @SuppressWarnings("unchecked") - PCollection pc = (PCollection) entry.getValue(); - - context.setPCollectionValuesWithMetadata( - pc, - (tag == transform.mainOutputTag - ? executionContext.getOutput(tag) - : executionContext.getSideOutput(tag))); - } - } - - /** - * Evaluates a single-output or multi-output {@link ParDo} directly. - * - *

    This evaluation method is intended for use in testing scenarios; it is designed for clarity - * and correctness-checking, not speed. - * - *

    Of particular note, this performs best-effort checking that inputs and outputs are not - * mutated in violation of the requirements upon a {@link DoFn}. - */ - private static void evaluateHelper( - DoFn doFn, - String stepName, - PCollection input, - List> sideInputs, - TupleTag mainOutputTag, - List> sideOutputTags, - PCollectionTuple outputs, - DirectPipelineRunner.EvaluationContext context, - DirectModeExecutionContext executionContext) { - // TODO: Run multiple shards? - DoFn fn = context.ensureSerializable(doFn); - - SideInputReader sideInputReader = makeSideInputReader(context, sideInputs); - - // When evaluating via the DirectPipelineRunner, this output manager checks each output for - // illegal mutations when the next output comes along. We then verify again after finishBundle() - // The common case we expect this to catch is a user mutating an input in order to repeatedly - // emit "variations". - ImmutabilityCheckingOutputManager outputManager = - new ImmutabilityCheckingOutputManager<>( - fn.getClass().getSimpleName(), - new DoFnRunnerBase.ListOutputManager(), - outputs); - - DoFnRunner fnRunner = - DoFnRunners.createDefault( - context.getPipelineOptions(), - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - executionContext.getOrCreateStepContext(stepName, stepName), - context.getAddCounterMutator(), - input.getWindowingStrategy()); - - fnRunner.startBundle(); - - for (DirectPipelineRunner.ValueWithMetadata elem - : context.getPCollectionValuesWithMetadata(input)) { - if (elem.getValue() instanceof KV) { - // In case the DoFn needs keyed state, set the implicit keys to the keys - // in the input elements. - @SuppressWarnings("unchecked") - KV kvElem = (KV) elem.getValue(); - executionContext.setKey(kvElem.getKey()); - } else { - executionContext.setKey(elem.getKey()); - } - - // We check the input for mutations only through the call span of processElement. - // This will miss some cases, but the check is ad hoc and best effort. The common case - // is that the input is mutated to be used for output. - try { - MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder( - elem.getWindowedValue().getValue(), input.getCoder()); - @SuppressWarnings("unchecked") - WindowedValue windowedElem = ((WindowedValue) elem.getWindowedValue()); - fnRunner.processElement(windowedElem); - inputMutationDetector.verifyUnmodified(); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } catch (IllegalMutationException exn) { - throw new IllegalMutationException( - String.format("DoFn %s mutated input value %s of class %s (new value was %s)." - + " Input values must not be mutated in any way.", - fn.getClass().getSimpleName(), - exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()), - exn.getSavedValue(), - exn.getNewValue(), - exn); - } - } - - // Note that the input could have been retained and mutated prior to this final output, - // but for now it degrades readability too much to be worth trying to catch that particular - // corner case. - fnRunner.finishBundle(); - outputManager.verifyLatestOutputsUnmodified(); - } - - private static SideInputReader makeSideInputReader( - DirectPipelineRunner.EvaluationContext context, List> sideInputs) { - PTuple sideInputValues = PTuple.empty(); - for (PCollectionView view : sideInputs) { - sideInputValues = sideInputValues.and( - view.getTagInternal(), - context.getPCollectionView(view)); - } - return DirectSideInputReader.of(sideInputValues); - } - private static void populateDisplayData( DisplayData.Builder builder, DoFn fn, Class fnClass) { builder .include(fn) .add(DisplayData.item("fn", fnClass) - .withLabel("Transform Function")); - } - - /** - * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values for - * illegal mutations. - * - *

    When used via the try-with-resources pattern, it is guaranteed that every value passed - * to {@link #output} will have been checked for illegal mutation. - */ - private static class ImmutabilityCheckingOutputManager - implements DoFnRunners.OutputManager, AutoCloseable { - - private final DoFnRunners.OutputManager underlyingOutputManager; - private final ConcurrentMap, MutationDetector> mutationDetectorForTag; - private final PCollectionTuple outputs; - private String doFnName; - - public ImmutabilityCheckingOutputManager( - String doFnName, - DoFnRunners.OutputManager underlyingOutputManager, - PCollectionTuple outputs) { - this.doFnName = doFnName; - this.underlyingOutputManager = underlyingOutputManager; - this.outputs = outputs; - this.mutationDetectorForTag = Maps.newConcurrentMap(); - } - - @Override - public void output(TupleTag tag, WindowedValue output) { - - // Skip verifying undeclared outputs, since we don't have coders for them. - if (outputs.has(tag)) { - try { - MutationDetector newDetector = - MutationDetectors.forValueWithCoder( - output.getValue(), outputs.get(tag).getCoder()); - MutationDetector priorDetector = mutationDetectorForTag.put(tag, newDetector); - verifyOutputUnmodified(priorDetector); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } - } - - // Actually perform the output. - underlyingOutputManager.output(tag, output); - } - - /** - * Throws {@link IllegalMutationException} if the prior output for any tag has been mutated - * since being output. - */ - public void verifyLatestOutputsUnmodified() { - for (MutationDetector detector : mutationDetectorForTag.values()) { - verifyOutputUnmodified(detector); - } - } - - /** - * Adapts the error message from the provided {@code detector}. - * - *

    The {@code detector} may be null, in which case no check is performed. This is merely - * to consolidate null checking to this method. - */ - private void verifyOutputUnmodified(@Nullable MutationDetector detector) { - if (detector == null) { - return; - } - - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException exn) { - throw new IllegalMutationException(String.format( - "DoFn %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - doFnName, exn.getSavedValue(), exn.getNewValue()), - exn.getSavedValue(), exn.getNewValue(), - exn); - } - } - - /** - * When used in a {@code try}-with-resources block, verifies all of the latest outputs upon - * {@link #close()}. - */ - @Override - public void close() { - verifyLatestOutputsUnmodified(); - } + .withLabel("Transform Function")); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 3df915b..7a97c13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -445,27 +443,5 @@ public class View { public PCollectionView apply(PCollection input) { return view; } - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - CreatePCollectionView.class, - new DirectPipelineRunner.TransformEvaluator() { - @SuppressWarnings("rawtypes") - @Override - public void evaluate( - CreatePCollectionView transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateTyped(transform, context); - } - - private void evaluateTyped( - CreatePCollectionView transform, - DirectPipelineRunner.EvaluationContext context) { - List> elems = - context.getPCollectionWindowedValues(context.getInput(transform)); - context.setPCollectionView(context.getOutput(transform), elems); - } - }); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java deleted file mode 100644 index 85e36dd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; - -/** - * {@link ExecutionContext} for use in direct mode. - */ -public class DirectModeExecutionContext - extends BaseExecutionContext { - - private Object key; - private List> output = Lists.newArrayList(); - private Map, List>> sideOutputs = Maps.newHashMap(); - - protected DirectModeExecutionContext() {} - - public static DirectModeExecutionContext create() { - return new DirectModeExecutionContext(); - } - - @Override - protected StepContext createStepContext(String stepName, String transformName) { - return new StepContext(this, stepName, transformName); - } - - public Object getKey() { - return key; - } - - public void setKey(Object newKey) { - // The direct mode runner may reorder elements, so we need to keep - // around the state used for each key. - for (ExecutionContext.StepContext stepContext : getAllStepContexts()) { - ((StepContext) stepContext).switchKey(newKey); - } - key = newKey; - } - - @Override - public void noteOutput(WindowedValue outputElem) { - output.add(ValueWithMetadata.of(outputElem).withKey(getKey())); - } - - @Override - public void noteSideOutput(TupleTag tag, WindowedValue outputElem) { - List> output = sideOutputs.get(tag); - if (output == null) { - output = Lists.newArrayList(); - sideOutputs.put(tag, output); - } - output.add(ValueWithMetadata.of(outputElem).withKey(getKey())); - } - - public List> getOutput(@SuppressWarnings("unused") TupleTag tag) { - @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes - List> typedOutput = (List) output; - return typedOutput; - } - - public List> getSideOutput(TupleTag tag) { - if (sideOutputs.containsKey(tag)) { - @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes - List> typedOutput = (List) sideOutputs.get(tag); - return typedOutput; - } else { - return Lists.newArrayList(); - } - } - - /** - * {@link ExecutionContext.StepContext} used in direct mode. - */ - public static class StepContext extends BaseExecutionContext.StepContext { - - /** A map from each key to the state associated with it. */ - private final Map> stateInternals = Maps.newHashMap(); - private InMemoryStateInternals currentStateInternals = null; - - private StepContext(ExecutionContext executionContext, String stepName, String transformName) { - super(executionContext, stepName, transformName); - switchKey(null); - } - - public void switchKey(Object newKey) { - currentStateInternals = stateInternals.get(newKey); - if (currentStateInternals == null) { - currentStateInternals = InMemoryStateInternals.forKey(newKey); - stateInternals.put(newKey, currentStateInternals); - } - } - - @Override - public StateInternals stateInternals() { - return checkNotNull(currentStateInternals); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException("Direct mode cannot return timerInternals"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 75861fe..58b10a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn;