Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B71F6200BD3 for ; Tue, 6 Dec 2016 19:49:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B59E3160B1B; Tue, 6 Dec 2016 18:49:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E8A08160B17 for ; Tue, 6 Dec 2016 19:49:04 +0100 (CET) Received: (qmail 75895 invoked by uid 500); 6 Dec 2016 18:49:04 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 75885 invoked by uid 99); 6 Dec 2016 18:49:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Dec 2016 18:49:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9FBE0C8940 for ; Tue, 6 Dec 2016 18:49:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Vhf_g3UWFV0I for ; Tue, 6 Dec 2016 18:48:56 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 8B1E45FADB for ; Tue, 6 Dec 2016 18:48:54 +0000 (UTC) Received: (qmail 68559 invoked by uid 99); 6 Dec 2016 18:47:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Dec 2016 18:47:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3B81EF79A; Tue, 6 Dec 2016 18:47:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.incubator.apache.org Date: Tue, 06 Dec 2016 18:47:06 -0000 Message-Id: <944cbb9c9af84d12bdbee479eaaf720f@git.apache.org> In-Reply-To: <93adaf8c185d4c07977106067c038ed7@git.apache.org> References: <93adaf8c185d4c07977106067c038ed7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] incubator-beam git commit: Add DirectGraphs to DirectRunner Tests archived-at: Tue, 06 Dec 2016 18:49:06 -0000 Add DirectGraphs to DirectRunner Tests Add getGraph(Pipeline) and getProducer(PValue), which use the DirectGraphVisitor and DirectGraph methods to provide access to the producing AppliedPTransform. Remove getProducingTransformInternal from everywhere except DirectGraphVisitorTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6c6ad37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6c6ad37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6c6ad37 Branch: refs/heads/master Commit: d6c6ad37149622e4d35af39727cdf774e6263d1e Parents: 077d911 Author: Thomas Groh Authored: Fri Dec 2 10:56:36 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 6 10:46:39 2016 -0800 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactoryTest.java | 18 +- .../runners/direct/DirectGraphVisitorTest.java | 1 + .../beam/runners/direct/DirectGraphs.java | 35 +++ .../runners/direct/EvaluationContextTest.java | 82 ++++--- .../direct/FlattenEvaluatorFactoryTest.java | 15 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 2 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 3 +- .../ImmutabilityEnforcementFactoryTest.java | 2 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 3 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 +- .../runners/direct/StepTransformResultTest.java | 2 +- .../direct/TestStreamEvaluatorFactoryTest.java | 14 +- .../runners/direct/TransformExecutorTest.java | 9 +- .../UnboundedReadEvaluatorFactoryTest.java | 24 +- .../direct/ViewEvaluatorFactoryTest.java | 4 +- .../direct/WatermarkCallbackExecutorTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java | 237 ++++++++----------- 17 files changed, 246 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index b1ff689..acb1444 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -80,6 +80,7 @@ public class BoundedReadEvaluatorFactoryTest { private BoundedReadEvaluatorFactory factory; @Mock private EvaluationContext context; private BundleFactory bundleFactory; + private AppliedPTransform longsProducer; @Before public void setup() { @@ -92,6 +93,7 @@ public class BoundedReadEvaluatorFactoryTest { new BoundedReadEvaluatorFactory( context, Long.MAX_VALUE /* minimum size for dynamic splits */); bundleFactory = ImmutableListBundleFactory.create(); + longsProducer = DirectGraphs.getProducer(longs); } @Test @@ -102,11 +104,11 @@ public class BoundedReadEvaluatorFactoryTest { Collection> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), 1); + .getInitialInputs(longsProducer, 1); List> outputs = new ArrayList<>(); for (CommittedBundle shardBundle : initialInputs) { TransformEvaluator evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + factory.forApplication(longsProducer, null); for (WindowedValue shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } @@ -141,7 +143,7 @@ public class BoundedReadEvaluatorFactoryTest { } PCollection read = TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); - AppliedPTransform transform = read.getProducingTransformInternal(); + AppliedPTransform transform = DirectGraphs.getProducer(read); Collection> unreadInputs = new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); @@ -191,7 +193,7 @@ public class BoundedReadEvaluatorFactoryTest { PCollection read = TestPipeline.create() .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L)))); - AppliedPTransform transform = read.getProducingTransformInternal(); + AppliedPTransform transform = DirectGraphs.getProducer(read); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); @@ -238,7 +240,7 @@ public class BoundedReadEvaluatorFactoryTest { }); Collection> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), 3); + .getInitialInputs(longsProducer, 3); assertThat(initialInputs, hasSize(allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(4)))); @@ -271,7 +273,7 @@ public class BoundedReadEvaluatorFactoryTest { CommittedBundle> shards = rootBundle.commit(Instant.now()); TransformEvaluator> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), shards); + factory.forApplication(longsProducer, shards); for (WindowedValue> shard : shards.getElements()) { UncommittedBundle outputBundle = bundleFactory.createBundle(longs); when(context.createBundle(longs)).thenReturn(outputBundle); @@ -299,7 +301,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection pcollection = p.apply(Read.from(source)); - AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform sourceTransform = DirectGraphs.getProducer(pcollection); UncommittedBundle output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); @@ -320,7 +322,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection pcollection = p.apply(Read.from(source)); - AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform sourceTransform = DirectGraphs.getProducer(pcollection); UncommittedBundle output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index d218a81..fb84de8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -48,6 +48,7 @@ import org.junit.runners.JUnit4; /** * Tests for {@link DirectGraphVisitor}. */ +// TODO: Replace uses of getProducing @RunWith(JUnit4.class) public class DirectGraphVisitorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java new file mode 100644 index 0000000..73ada19 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.values.PValue; + +/** Test utilities for the {@link DirectRunner}. */ +final class DirectGraphs { + public static DirectGraph getGraph(Pipeline p) { + DirectGraphVisitor visitor = new DirectGraphVisitor(); + p.traverseTopologically(visitor); + return visitor.getGraph(); + } + + public static AppliedPTransform getProducer(PValue value) { + return getGraph(value.getPipeline()).getProducer(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 17cdea1..a2bb15e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -86,9 +86,13 @@ public class EvaluationContextTest { private PCollectionView> view; private PCollection unbounded; - private BundleFactory bundleFactory; private DirectGraph graph; + private AppliedPTransform createdProducer; + private AppliedPTransform downstreamProducer; + private AppliedPTransform viewProducer; + private AppliedPTransform unboundedProducer; + @Before public void setup() { DirectRunner runner = @@ -101,14 +105,16 @@ public class EvaluationContextTest { view = created.apply(View.asIterable()); unbounded = p.apply(CountingInput.unbounded()); - DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); - p.traverseTopologically(graphVisitor); - - bundleFactory = ImmutableListBundleFactory.create(); - graph = graphVisitor.getGraph(); + BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + graph = DirectGraphs.getGraph(p); context = EvaluationContext.create( runner.getPipelineOptions(), NanosOffsetClock.create(), bundleFactory, graph); + + createdProducer = graph.getProducer(created); + downstreamProducer = graph.getProducer(downstream); + viewProducer = graph.getProducer(view); + unboundedProducer = graph.getProducer(unbounded); } @Test @@ -146,7 +152,7 @@ public class EvaluationContextTest { @Test public void getExecutionContextSameStepSameKeyState() { DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -159,12 +165,12 @@ public class EvaluationContextTest { .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), created) .commit(Instant.now()), ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) + StepTransformResult.withoutHold(createdProducer) .withState(stepContext.commitState()) .build()); DirectExecutionContext secondFooContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); assertThat( secondFooContext @@ -179,7 +185,7 @@ public class EvaluationContextTest { @Test public void getExecutionContextDifferentKeysIndependentState() { DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -191,7 +197,7 @@ public class EvaluationContextTest { .add(1); DirectExecutionContext barContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("bar", StringUtf8Coder.of())); assertThat(barContext, not(equalTo(fooContext))); assertThat( @@ -207,7 +213,7 @@ public class EvaluationContextTest { public void getExecutionContextDifferentStepsIndependentState() { StructuralKey myKey = StructuralKey.of("foo", StringUtf8Coder.of()); DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), myKey); + context.getExecutionContext(createdProducer, myKey); StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -218,7 +224,7 @@ public class EvaluationContextTest { .add(1); DirectExecutionContext barContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + context.getExecutionContext(downstreamProducer, myKey); assertThat( barContext .getOrCreateStepContext("s1", "s1") @@ -232,15 +238,15 @@ public class EvaluationContextTest { public void handleResultCommitsAggregators() { Class fn = getClass(); DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), null); + context.getExecutionContext(createdProducer, null); DirectExecutionContext.StepContext stepContext = fooContext.createStepContext( - "STEP", created.getProducingTransformInternal().getTransform().getName()); + "STEP", createdProducer.getTransform().getName()); AggregatorContainer container = context.getAggregatorContainer(); AggregatorContainer.Mutator mutator = container.createMutator(); mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L); TransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()) + StepTransformResult.withoutHold(createdProducer) .withAggregatorChanges(mutator) .build(); context.handleResult(null, ImmutableList.of(), result); @@ -250,7 +256,7 @@ public class EvaluationContextTest { mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L); TransformResult secondResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + StepTransformResult.withoutHold(downstreamProducer) .withAggregatorChanges(mutatorAgain) .build(); context.handleResult( @@ -264,7 +270,7 @@ public class EvaluationContextTest { public void handleResultStoresState() { StructuralKey myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of()); DirectExecutionContext fooContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + context.getExecutionContext(downstreamProducer, myKey); StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -276,7 +282,7 @@ public class EvaluationContextTest { bag.add(4); TransformResult stateResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + StepTransformResult.withoutHold(downstreamProducer) .withState(state) .build(); @@ -286,7 +292,7 @@ public class EvaluationContextTest { stateResult); DirectExecutionContext afterResultContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + context.getExecutionContext(downstreamProducer, myKey); CopyOnAccessInMemoryStateInternals afterResultState = afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); @@ -309,7 +315,7 @@ public class EvaluationContextTest { downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); TransformResult result = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + StepTransformResult.withHold(createdProducer, new Instant(0)) .build(); context.handleResult(null, ImmutableList.of(), result); @@ -318,7 +324,7 @@ public class EvaluationContextTest { assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); TransformResult finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); context.handleResult(null, ImmutableList.of(), finishedResult); context.forceRefresh(); // Obtain the value via blocking call @@ -328,7 +334,7 @@ public class EvaluationContextTest { @Test public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { TransformResult finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); context.handleResult(null, ImmutableList.of(), finishedResult); final CountDownLatch callLatch = new CountDownLatch(1); @@ -348,7 +354,7 @@ public class EvaluationContextTest { @Test public void extractFiredTimersExtractsTimers() { TransformResult holdResult = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + StepTransformResult.withHold(createdProducer, new Instant(0)) .build(); context.handleResult(null, ImmutableList.of(), holdResult); @@ -356,7 +362,7 @@ public class EvaluationContextTest { TimerData toFire = TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); TransformResult timerResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + StepTransformResult.withoutHold(downstreamProducer) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) .build(); @@ -372,7 +378,7 @@ public class EvaluationContextTest { assertThat(context.extractFiredTimers(), emptyIterable()); TransformResult advanceResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); // Should cause the downstream timer to fire context.handleResult(null, ImmutableList.of(), advanceResult); @@ -403,14 +409,14 @@ public class EvaluationContextTest { @Test public void isDoneWithUnboundedPCollectionAndShutdown() { context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + assertThat(context.isDone(unboundedProducer), is(false)); context.handleResult( null, ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(unboundedProducer).build()); context.extractFiredTimers(); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); + assertThat(context.isDone(unboundedProducer), is(true)); } @Test @@ -428,14 +434,14 @@ public class EvaluationContextTest { @Test public void isDoneWithOnlyBoundedPCollections() { context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); + assertThat(context.isDone(createdProducer), is(false)); context.handleResult( null, ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(createdProducer).build()); context.extractFiredTimers(); - assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); + assertThat(context.isDone(createdProducer), is(true)); } @Test @@ -449,7 +455,7 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) + StepTransformResult.withoutHold(createdProducer) .addOutput(rootBundle) .build()); @SuppressWarnings("unchecked") @@ -458,7 +464,7 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(unboundedProducer).build()); assertThat(context.isDone(), is(false)); for (AppliedPTransform consumers : graph.getPrimitiveConsumers(created)) { @@ -479,22 +485,22 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(createdProducer).build()); context.handleResult( null, ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(unboundedProducer).build()); context.handleResult( context.createBundle(created).commit(Instant.now()), ImmutableList.of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(downstreamProducer).build()); context.extractFiredTimers(); assertThat(context.isDone(), is(false)); context.handleResult( context.createBundle(created).commit(Instant.now()), ImmutableList.of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(viewProducer).build()); context.extractFiredTimers(); assertThat(context.isDone(), is(false)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index cb27fbc..9e22c36 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -47,6 +47,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class FlattenEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + @Test public void testFlattenInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -69,10 +70,11 @@ public class FlattenEvaluatorFactoryTest { when(context.createBundle(flattened)).thenReturn(flattenedLeftBundle, flattenedRightBundle); FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context); + AppliedPTransform flattenedProducer = DirectGraphs.getProducer(flattened); TransformEvaluator leftSideEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), leftBundle); + factory.forApplication(flattenedProducer, leftBundle); TransformEvaluator rightSideEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), rightBundle); + factory.forApplication(flattenedProducer, rightBundle); leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1)); rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); @@ -92,13 +94,13 @@ public class FlattenEvaluatorFactoryTest { Matchers.>contains(flattenedRightBundle)); assertThat( rightSideResult.getTransform(), - Matchers.>equalTo(flattened.getProducingTransformInternal())); + Matchers.>equalTo(flattenedProducer)); assertThat( leftSideResult.getOutputBundles(), Matchers.>contains(flattenedLeftBundle)); assertThat( leftSideResult.getTransform(), - Matchers.>equalTo(flattened.getProducingTransformInternal())); + Matchers.>equalTo(flattenedProducer)); assertThat( flattenedLeftBundle.commit(Instant.now()).getElements(), @@ -126,9 +128,10 @@ public class FlattenEvaluatorFactoryTest { .thenReturn(bundleFactory.createBundle(flattened)); FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext); + AppliedPTransform flattendProducer = DirectGraphs.getProducer(flattened); TransformEvaluator emptyEvaluator = factory.forApplication( - flattened.getProducingTransformInternal(), + flattendProducer, bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); TransformResult leftSideResult = emptyEvaluator.finishBundle(); @@ -138,7 +141,7 @@ public class FlattenEvaluatorFactoryTest { assertThat(outputBundle.getElements(), emptyIterable()); assertThat( leftSideResult.getTransform(), - Matchers.>equalTo(flattened.getProducingTransformInternal())); + Matchers.>equalTo(flattendProducer)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 7ba38ce..f0b29f0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -97,7 +97,7 @@ public class GroupByKeyEvaluatorFactoryTest { ((KvCoder) values.getCoder()).getKeyCoder(); TransformEvaluator> evaluator = new GroupByKeyOnlyEvaluatorFactory(evaluationContext) - .forApplication(groupedKvs.getProducingTransformInternal(), inputBundle); + .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 23340c6..7efdb3d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -90,8 +90,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { ((KvCoder) values.getCoder()).getKeyCoder(); TransformEvaluator> evaluator = new GroupByKeyOnlyEvaluatorFactory(evaluationContext) - .forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle); + .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index a65cd30..1ad6ba6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -64,7 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { c.element()[0] = 'b'; } })); - consumer = pcollection.apply(Count.globally()).getProducingTransformInternal(); + consumer = DirectGraphs.getProducer(pcollection.apply(Count.globally())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 85e99c5..d48ac14 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -152,8 +152,9 @@ public class ParDoEvaluatorTest { when(evaluationContext.getAggregatorContainer()).thenReturn(container); when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + @SuppressWarnings("unchecked") AppliedPTransform, ?, ?> transform = - (AppliedPTransform, ?, ?>) output.getProducingTransformInternal(); + (AppliedPTransform, ?, ?>) DirectGraphs.getProducer(output); return ParDoEvaluator.create( evaluationContext, stepContext, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index ecf11ed..06c85ef 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -129,7 +129,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { AppliedPTransform< PCollection>>, PCollectionTuple, StatefulParDo> - producingTransform = (AppliedPTransform) produced.getProducingTransformInternal(); + producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced); // Then there will be a digging down to the step context to get the state internals when(mockEvaluationContext.getExecutionContext( @@ -239,7 +239,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { AppliedPTransform< PCollection>>, PCollectionTuple, StatefulParDo> - producingTransform = (AppliedPTransform) produced.getProducingTransformInternal(); + producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced); // Then there will be a digging down to the step context to get the state internals when(mockEvaluationContext.getExecutionContext( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java index a21d8f7..d3a2cca 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java @@ -48,7 +48,7 @@ public class StepTransformResultTest { public void setup() { TestPipeline p = TestPipeline.create(); pc = p.apply(Create.of(1, 2, 3)); - transform = pc.getProducingTransformInternal(); + transform = DirectGraphs.getGraph(p).getProducer(pc); bundleFactory = ImmutableListBundleFactory.create(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 3d31df6..6bb8623 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.TestStreamIndex import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -80,15 +81,16 @@ public class TestStreamEvaluatorFactoryTest { when(context.createBundle(streamVals)) .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals)); + AppliedPTransform streamProducer = DirectGraphs.getProducer(streamVals); Collection> initialInputs = new TestStreamEvaluatorFactory.InputProvider(context) - .getInitialInputs(streamVals.getProducingTransformInternal(), 1); + .getInitialInputs(streamProducer, 1); @SuppressWarnings("unchecked") CommittedBundle> initialBundle = (CommittedBundle>) Iterables.getOnlyElement(initialInputs); TransformEvaluator> firstEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle); + factory.forApplication(streamProducer, initialBundle); firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements())); TransformResult> firstResult = firstEvaluator.finishBundle(); @@ -101,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle> secondBundle = initialBundle.withElements(Collections.singleton(firstResidual)); TransformEvaluator> secondEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle); + factory.forApplication(streamProducer, secondBundle); secondEvaluator.processElement(firstResidual); TransformResult> secondResult = secondEvaluator.finishBundle(); @@ -114,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle> thirdBundle = secondBundle.withElements(Collections.singleton(secondResidual)); TransformEvaluator> thirdEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle); + factory.forApplication(streamProducer, thirdBundle); thirdEvaluator.processElement(secondResidual); TransformResult> thirdResult = thirdEvaluator.finishBundle(); @@ -128,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle> fourthBundle = thirdBundle.withElements(Collections.singleton(thirdResidual)); TransformEvaluator> fourthEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle); + factory.forApplication(streamProducer, fourthBundle); fourthEvaluator.processElement(thirdResidual); TransformResult> fourthResult = fourthEvaluator.finishBundle(); @@ -142,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle> fifthBundle = thirdBundle.withElements(Collections.singleton(fourthResidual)); TransformEvaluator> fifthEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle); + factory.forApplication(streamProducer, fifthBundle); fifthEvaluator.processElement(fourthResidual); TransformResult> fifthResult = fifthEvaluator.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 08b1e18..4ad22bc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -89,8 +89,9 @@ public class TransformExecutorTest { created = p.apply(Create.of("foo", "spam", "third")); PCollection> downstream = created.apply(WithKeys.of(3)); - createdProducer = created.getProducingTransformInternal(); - downstreamProducer = downstream.getProducingTransformInternal(); + DirectGraph graph = DirectGraphs.getGraph(p); + createdProducer = graph.getProducer(created); + downstreamProducer = graph.getProducer(downstream); when(evaluationContext.getMetrics()).thenReturn(metrics); } @@ -317,7 +318,7 @@ public class TransformExecutorTest { @Test public void callWithEnforcementThrowsOnFinishPropagates() throws Exception { final TransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); TransformEvaluator evaluator = new TransformEvaluator() { @@ -356,7 +357,7 @@ public class TransformExecutorTest { @Test public void callWithEnforcementThrowsOnElementPropagates() throws Exception { final TransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); TransformEvaluator evaluator = new TransformEvaluator() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 5a10134..dd36a2f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.runners.direct.DirectGraphs.getProducer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -90,6 +91,7 @@ public class UnboundedReadEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); private UnboundedSource source; + private DirectGraph graph; @Before public void setup() { @@ -100,6 +102,7 @@ public class UnboundedReadEvaluatorFactoryTest { context = mock(EvaluationContext.class); factory = new UnboundedReadEvaluatorFactory(context); output = bundleFactory.createBundle(longs); + graph = DirectGraphs.getGraph(p); when(context.createBundle(longs)).thenReturn(output); } @@ -115,7 +118,7 @@ public class UnboundedReadEvaluatorFactoryTest { int numSplits = 5; Collection> initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), numSplits); + .getInitialInputs(graph.getProducer(longs), numSplits); // CountingSource.unbounded has very good splitting behavior assertThat(initialInputs, hasSize(numSplits)); @@ -148,15 +151,14 @@ public class UnboundedReadEvaluatorFactoryTest { Collection> initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), 1); + .getInitialInputs(graph.getProducer(longs), 1); CommittedBundle inputShards = Iterables.getOnlyElement(initialInputs); UnboundedSourceShard inputShard = (UnboundedSourceShard) Iterables.getOnlyElement(inputShards.getElements()).getValue(); TransformEvaluator> evaluator = - factory.forApplication( - longs.getProducingTransformInternal(), inputShards); + factory.forApplication(graph.getProducer(longs), inputShards); evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements())); TransformResult> result = evaluator.finishBundle(); @@ -190,7 +192,7 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection pcollection = p.apply(Read.from(source)); - AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform sourceTransform = getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); Collection> initialInputs = @@ -233,7 +235,7 @@ public class UnboundedReadEvaluatorFactoryTest { // Read with a very slow rate so by the second read there are no more elements PCollection pcollection = p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L))); - AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform sourceTransform = DirectGraphs.getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); Collection> initialInputs = @@ -291,7 +293,9 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection pcollection = p.apply(Read.from(source)); - AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + DirectGraph graph = DirectGraphs.getGraph(p); + AppliedPTransform sourceTransform = + graph.getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); UncommittedBundle output = bundleFactory.createBundle(pcollection); @@ -307,8 +311,7 @@ public class UnboundedReadEvaluatorFactoryTest { .commit(Instant.now()); UnboundedReadEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */); - new UnboundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(pcollection.getProducingTransformInternal(), 1); + new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform, 1); TransformEvaluator> evaluator = factory.forApplication(sourceTransform, inputBundle); evaluator.processElement(shard); @@ -336,7 +339,8 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection pcollection = p.apply(Read.from(source)); - AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform sourceTransform = + DirectGraphs.getGraph(p).getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); UncommittedBundle output = bundleFactory.createBundle(pcollection); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 7d14020..7c08009 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Values; @@ -73,9 +74,10 @@ public class ViewEvaluatorFactoryTest { CommittedBundle inputBundle = bundleFactory.createBundle(input).commit(Instant.now()); + AppliedPTransform producer = DirectGraphs.getProducer(view); TransformEvaluator> evaluator = new ViewEvaluatorFactory(context) - .forApplication(view.getProducingTransformInternal(), inputBundle); + .forApplication(producer, inputBundle); evaluator.processElement( WindowedValue.>valueInGlobalWindow(ImmutableList.of("foo", "bar"))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java index 1be9a98..acdabb6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java @@ -55,8 +55,10 @@ public class WatermarkCallbackExecutorTest { public void setup() { TestPipeline p = TestPipeline.create(); PCollection created = p.apply(Create.of(1, 2, 3)); - create = created.getProducingTransformInternal(); - sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); + PCollection summed = created.apply(Sum.integersGlobally()); + DirectGraph graph = DirectGraphs.getGraph(p); + create = graph.getProducer(created); + sum = graph.getProducer(summed); } @Test