Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 33EC6200C5E for ; Fri, 17 Mar 2017 22:46:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 32B68160B8F; Fri, 17 Mar 2017 21:46:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E0519160B96 for ; Fri, 17 Mar 2017 22:46:34 +0100 (CET) Received: (qmail 22679 invoked by uid 500); 17 Mar 2017 21:46:33 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 22313 invoked by uid 99); 17 Mar 2017 21:46:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Mar 2017 21:46:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5CE3DFF7C; Fri, 17 Mar 2017 21:46:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Fri, 17 Mar 2017 21:46:55 -0000 Message-Id: <185564634a98498481f170694cea375f@git.apache.org> In-Reply-To: <7d5726b420a54a668c2192f9d38f9f27@git.apache.org> References: <7d5726b420a54a668c2192f9d38f9f27@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/50] [abbrv] beam git commit: Remove Pipeline.getRunner archived-at: Fri, 17 Mar 2017 21:46:37 -0000 Remove Pipeline.getRunner Runners need not be instantiated until after pipeline construction, so they should not be exposed by the Pipeline class. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d41fe1df Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d41fe1df Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d41fe1df Branch: refs/heads/gearpump-runner Commit: d41fe1df26329479b82cc59d260998f2b88b4799 Parents: 2c2424c Author: Thomas Groh Authored: Thu Mar 2 10:54:29 2017 -0800 Committer: Thomas Groh Committed: Fri Mar 10 09:40:50 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 95 ++++++++++---------- .../direct/TestStreamEvaluatorFactory.java | 22 +++-- .../direct/TestStreamEvaluatorFactoryTest.java | 6 +- .../BatchStatefulParDoOverridesTest.java | 4 +- .../DataflowPipelineTranslatorTest.java | 39 ++++---- .../runners/dataflow/DataflowRunnerTest.java | 4 +- .../testing/TestDataflowRunnerTest.java | 50 +++++------ .../main/java/org/apache/beam/sdk/Pipeline.java | 7 -- 8 files changed, 110 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index f56d225..ce8dbc0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -70,53 +70,6 @@ import org.joda.time.Instant; */ public class DirectRunner extends PipelineRunner { /** - * The default set of transform overrides to use in the {@link DirectRunner}. - * - *

The order in which overrides is applied is important, as some overrides are expanded into a - * composite. If the composite contains {@link PTransform PTransforms} which are also overridden, - * these PTransforms must occur later in the iteration order. {@link ImmutableMap} has an - * iteration order based on the order at which elements are added to it. - */ - @SuppressWarnings("rawtypes") - private static Map defaultTransformOverrides = - ImmutableMap.builder() - .put( - PTransformMatchers.writeWithRunnerDeterminedSharding(), - new WriteWithShardingFactory()) /* Uses a view internally. */ - .put( - PTransformMatchers.classEqualTo(CreatePCollectionView.class), - new ViewOverrideFactory()) /* Uses pardos and GBKs */ - .put( - PTransformMatchers.classEqualTo(TestStream.class), - new DirectTestStreamFactory()) /* primitive */ - /* Single-output ParDos are implemented in terms of Multi-output ParDos. Any override - that is applied to a multi-output ParDo must first have all matching Single-output ParDos - converted to match. - */ - .put(PTransformMatchers.splittableParDoSingle(), new ParDoSingleViaMultiOverrideFactory()) - .put( - PTransformMatchers.stateOrTimerParDoSingle(), - new ParDoSingleViaMultiOverrideFactory()) - // SplittableParMultiDo is implemented in terms of nonsplittable single ParDos - .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()) - // state and timer pardos are implemented in terms of nonsplittable single ParDos - .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()) - .put( - PTransformMatchers.classEqualTo(ParDo.Bound.class), - new ParDoSingleViaMultiOverrideFactory()) /* returns a BoundMulti */ - .put( - PTransformMatchers.classEqualTo(BoundMulti.class), - /* returns one of two primitives; SplittableParDos are replaced above. */ - new ParDoMultiOverrideFactory()) - .put( - PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class), - new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */ - .put( - PTransformMatchers.classEqualTo(GroupByKey.class), - new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */ - .build(); - - /** * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is * a part of at a later point. This is an uncommitted bundle and can have elements added to it. @@ -309,7 +262,7 @@ public class DirectRunner extends PipelineRunner { @Override public DirectPipelineResult run(Pipeline pipeline) { for (Map.Entry override : - defaultTransformOverrides.entrySet()) { + defaultTransformOverrides().entrySet()) { pipeline.replace(override.getKey(), override.getValue()); } MetricsEnvironment.setMetricsSupported(true); @@ -361,6 +314,52 @@ public class DirectRunner extends PipelineRunner { } /** + * The default set of transform overrides to use in the {@link DirectRunner}. + * + *

The order in which overrides is applied is important, as some overrides are expanded into a + * composite. If the composite contains {@link PTransform PTransforms} which are also overridden, + * these PTransforms must occur later in the iteration order. {@link ImmutableMap} has an + * iteration order based on the order at which elements are added to it. + */ + @SuppressWarnings("rawtypes") + private Map defaultTransformOverrides() { + return ImmutableMap.builder() + .put( + PTransformMatchers.writeWithRunnerDeterminedSharding(), + new WriteWithShardingFactory()) /* Uses a view internally. */ + .put( + PTransformMatchers.classEqualTo(CreatePCollectionView.class), + new ViewOverrideFactory()) /* Uses pardos and GBKs */ + .put( + PTransformMatchers.classEqualTo(TestStream.class), + new DirectTestStreamFactory(this)) /* primitive */ + /* Single-output ParDos are implemented in terms of Multi-output ParDos. Any override + that is applied to a multi-output ParDo must first have all matching Single-output ParDos + converted to match. + */ + .put(PTransformMatchers.splittableParDoSingle(), new ParDoSingleViaMultiOverrideFactory()) + .put(PTransformMatchers.stateOrTimerParDoSingle(), new ParDoSingleViaMultiOverrideFactory()) + // SplittableParMultiDo is implemented in terms of nonsplittable single ParDos + .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()) + // state and timer pardos are implemented in terms of nonsplittable single ParDos + .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()) + .put( + PTransformMatchers.classEqualTo(ParDo.Bound.class), + new ParDoSingleViaMultiOverrideFactory()) /* returns a BoundMulti */ + .put( + PTransformMatchers.classEqualTo(BoundMulti.class), + /* returns one of two primitives; SplittableParDos are replaced above. */ + new ParDoMultiOverrideFactory()) + .put( + PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class), + new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */ + .put( + PTransformMatchers.classEqualTo(GroupByKey.class), + new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */ + .build(); + } + + /** * The result of running a {@link Pipeline} with the {@link DirectRunner}. * *

Throws {@link UnsupportedOperationException} for all methods. http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 628aa23..0dd8919 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.direct; -import static com.google.common.base.Preconditions.checkState; - import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; @@ -35,7 +33,6 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.TestStream.ElementEvent; import org.apache.beam.sdk.testing.TestStream.Event; @@ -166,11 +163,16 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { static class DirectTestStreamFactory implements PTransformOverrideFactory, TestStream> { + private final DirectRunner runner; + + DirectTestStreamFactory(DirectRunner runner) { + this.runner = runner; + } @Override public PTransform> getReplacementTransform( TestStream transform) { - return new DirectTestStream<>(transform); + return new DirectTestStream<>(runner, transform); } @Override @@ -185,22 +187,18 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { } static class DirectTestStream extends PTransform> { + private final transient DirectRunner runner; private final TestStream original; @VisibleForTesting - DirectTestStream(TestStream transform) { + DirectTestStream(DirectRunner runner, TestStream transform) { + this.runner = runner; this.original = transform; } @Override public PCollection expand(PBegin input) { - PipelineRunner runner = input.getPipeline().getRunner(); - checkState( - runner instanceof DirectRunner, - "%s can only be used when running with the %s", - getClass().getSimpleName(), - DirectRunner.class.getSimpleName()); - ((DirectRunner) runner).setClockSupplier(new TestClockSupplier()); + runner.setClockSupplier(new TestClockSupplier()); return PCollection.createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) .setCoder(original.getValueCoder()); http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 9ed72d5..fc689fe 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -60,10 +60,12 @@ public class TestStreamEvaluatorFactoryTest { @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + private DirectRunner runner; @Before public void setup() { context = mock(EvaluationContext.class); + runner = DirectRunner.fromOptions(TestPipeline.testingPipelineOptions()); factory = new TestStreamEvaluatorFactory(context); bundleFactory = ImmutableListBundleFactory.create(); } @@ -80,7 +82,7 @@ public class TestStreamEvaluatorFactoryTest { .advanceProcessingTime(Duration.standardMinutes(10)) .advanceWatermarkToInfinity(); PCollection streamVals = - p.apply(new DirectTestStream(testStream)); + p.apply(new DirectTestStream(runner, testStream)); TestClock clock = new TestClock(); when(context.getClock()).thenReturn(clock); @@ -180,7 +182,7 @@ public class TestStreamEvaluatorFactoryTest { @Test public void overrideFactoryGetInputSucceeds() { - DirectTestStreamFactory factory = new DirectTestStreamFactory<>(); + DirectTestStreamFactory factory = new DirectTestStreamFactory<>(runner); PBegin begin = factory.getInput(Collections.emptyList(), p); assertThat(begin.getPipeline(), Matchers.equalTo(p)); } http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index ef3e414..899902a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -71,7 +71,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable { DummyStatefulDoFn fn = new DummyStatefulDoFn(); pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn)); - DataflowRunner runner = (DataflowRunner) pipeline.getRunner(); + DataflowRunner runner = DataflowRunner.fromOptions(options); runner.replaceTransforms(pipeline); assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn)); } @@ -89,7 +89,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable { .apply(Create.of(KV.of(1, 2))) .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(fn)); - DataflowRunner runner = (DataflowRunner) pipeline.getRunner(); + DataflowRunner runner = DataflowRunner.fromOptions(options); runner.replaceTransforms(pipeline); assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn)); } http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 660e92e..813e76d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -190,7 +190,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); Map sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions(); @@ -223,7 +223,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -240,7 +240,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -259,7 +259,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -276,7 +276,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -292,7 +292,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -328,7 +328,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -363,7 +363,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -397,7 +397,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -417,7 +417,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -438,7 +438,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()) + p, DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -653,7 +653,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Test public void testMultiGraphPipelineSerialization() throws Exception { - Pipeline p = Pipeline.create(buildPipelineOptions()); + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline p = Pipeline.create(options); PCollection input = p.begin() .apply(Create.of(1, 2, 3)); @@ -666,7 +667,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { // Check that translation doesn't fail. JobSpecification jobSpecification = t.translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()); + p, DataflowRunner.fromOptions(options), Collections.emptyList()); assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob()); } @@ -710,7 +711,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { // Check that translation doesn't fail. JobSpecification jobSpecification = t.translate( pipeline, - (DataflowRunner) pipeline.getRunner(), + DataflowRunner.fromOptions(options), Collections.emptyList()); assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob()); } @@ -737,7 +738,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); t.translate( pipeline, - (DataflowRunner) pipeline.getRunner(), + DataflowRunner.fromOptions(options), Collections.emptyList()); } @@ -764,7 +765,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { // Check that translation does not fail. t.translate( pipeline, - (DataflowRunner) pipeline.getRunner(), + DataflowRunner.fromOptions(options), Collections.emptyList()); } @@ -785,7 +786,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowRunner) pipeline.getRunner(), + DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertAllStepOutputsHaveUniqueIds(job); @@ -817,7 +818,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowRunner) pipeline.getRunner(), + DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertAllStepOutputsHaveUniqueIds(job); @@ -1011,7 +1012,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowRunner) pipeline.getRunner(), + DataflowRunner.fromOptions(options), Collections.emptyList()) .getJob(); assertAllStepOutputsHaveUniqueIds(job); http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a788077..a4031d1 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -951,7 +951,7 @@ public class DataflowRunnerTest { thrown.expectMessage(Matchers.containsString("no translator registered")); DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()); + p, DataflowRunner.fromOptions(options), Collections.emptyList()); ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); @@ -989,7 +989,7 @@ public class DataflowRunnerTest { }); translator.translate( - p, (DataflowRunner) p.getRunner(), Collections.emptyList()); + p, DataflowRunner.fromOptions(options), Collections.emptyList()); assertTrue(transform.translated); } http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 1e906d2..d3eccbb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -140,7 +140,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, null /* additionalMetrics */)); assertEquals(mockJob, runner.run(p, mockRunner)); @@ -160,7 +160,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */, null /* additionalMetrics */)); try { @@ -202,7 +202,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -233,7 +233,7 @@ public class TestDataflowRunnerTest { when(request.execute()) .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); runner.run(p, mockRunner); } @@ -254,7 +254,7 @@ public class TestDataflowRunnerTest { when(request.execute()) .thenReturn(generateMockStreamingMetricResponse( ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); runner.run(p, mockRunner); } @@ -275,7 +275,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -350,7 +350,7 @@ public class TestDataflowRunnerTest { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); doReturn(State.DONE).when(job).getState(); JobMetrics metrics = buildJobMetrics( generateMockMetrics(true /* success */, true /* tentative */)); @@ -364,7 +364,7 @@ public class TestDataflowRunnerTest { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); doReturn(State.DONE).when(job).getState(); JobMetrics metrics = buildJobMetrics( generateMockMetrics(false /* success */, true /* tentative */)); @@ -392,7 +392,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( ImmutableMap.of("no-watermark", new BigDecimal(100)))); doReturn(State.RUNNING).when(job).getState(); @@ -405,7 +405,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); doReturn(State.RUNNING).when(job).getState(); @@ -418,7 +418,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); doReturn(State.RUNNING).when(job).getState(); @@ -431,7 +431,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); doReturn(State.RUNNING).when(job).getState(); @@ -444,7 +444,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); @@ -458,7 +458,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); @@ -472,7 +472,7 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, "no-watermark", new BigDecimal(100)))); @@ -487,7 +487,7 @@ public class TestDataflowRunnerTest { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); doReturn(State.FAILED).when(job).getState(); assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null /* metrics */)); } @@ -522,7 +522,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -543,7 +543,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); JobMetrics metrics = runner.getJobMetrics(job); assertEquals(1, metrics.getMetrics().size()); @@ -558,7 +558,7 @@ public class TestDataflowRunnerTest { p.apply(Create.of(1, 2, 3)); when(request.execute()).thenThrow(new IOException()); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); assertNull(runner.getJobMetrics(job)); } @@ -576,7 +576,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); @@ -600,7 +600,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); @@ -627,7 +627,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); @@ -651,7 +651,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); @@ -678,7 +678,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); @@ -709,7 +709,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index f09f2b4..2f368b1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -456,13 +456,6 @@ public class Pipeline { } /** - * Returns the configured {@link PipelineRunner}. - */ - public PipelineRunner getRunner() { - return runner; - } - - /** * Returns the configured {@link PipelineOptions}. * * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly