beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [3/4] incubator-beam git commit: Add DirectGraphs to DirectRunner Tests
Date Tue, 06 Dec 2016 18:47:06 GMT
Add DirectGraphs to DirectRunner Tests

Add getGraph(Pipeline) and getProducer(PValue), which use the
DirectGraphVisitor and DirectGraph methods to provide access to the
producing AppliedPTransform.

Remove getProducingTransformInternal from everywhere except
DirectGraphVisitorTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6c6ad37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6c6ad37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6c6ad37

Branch: refs/heads/master
Commit: d6c6ad37149622e4d35af39727cdf774e6263d1e
Parents: 077d911
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Dec 2 10:56:36 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Tue Dec 6 10:46:39 2016 -0800

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactoryTest.java |  18 +-
 .../runners/direct/DirectGraphVisitorTest.java  |   1 +
 .../beam/runners/direct/DirectGraphs.java       |  35 +++
 .../runners/direct/EvaluationContextTest.java   |  82 ++++---
 .../direct/FlattenEvaluatorFactoryTest.java     |  15 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   2 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |   3 +-
 .../ImmutabilityEnforcementFactoryTest.java     |   2 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   3 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   4 +-
 .../runners/direct/StepTransformResultTest.java |   2 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  |  14 +-
 .../runners/direct/TransformExecutorTest.java   |   9 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |  24 +-
 .../direct/ViewEvaluatorFactoryTest.java        |   4 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   6 +-
 .../runners/direct/WatermarkManagerTest.java    | 237 ++++++++-----------
 17 files changed, 246 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index b1ff689..acb1444 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -80,6 +80,7 @@ public class BoundedReadEvaluatorFactoryTest {
   private BoundedReadEvaluatorFactory factory;
   @Mock private EvaluationContext context;
   private BundleFactory bundleFactory;
+  private AppliedPTransform<?, ?, ?> longsProducer;
 
   @Before
   public void setup() {
@@ -92,6 +93,7 @@ public class BoundedReadEvaluatorFactoryTest {
         new BoundedReadEvaluatorFactory(
             context, Long.MAX_VALUE /* minimum size for dynamic splits */);
     bundleFactory = ImmutableListBundleFactory.create();
+    longsProducer = DirectGraphs.getProducer(longs);
   }
 
   @Test
@@ -102,11 +104,11 @@ public class BoundedReadEvaluatorFactoryTest {
 
     Collection<CommittedBundle<?>> initialInputs =
         new BoundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 1);
+            .getInitialInputs(longsProducer, 1);
     List<WindowedValue<?>> outputs = new ArrayList<>();
     for (CommittedBundle<?> shardBundle : initialInputs) {
       TransformEvaluator<?> evaluator =
-          factory.forApplication(longs.getProducingTransformInternal(), null);
+          factory.forApplication(longsProducer, null);
       for (WindowedValue<?> shard : shardBundle.getElements()) {
         evaluator.processElement((WindowedValue) shard);
       }
@@ -141,7 +143,7 @@ public class BoundedReadEvaluatorFactoryTest {
     }
     PCollection<Long> read =
         TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
-    AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
     Collection<CommittedBundle<?>> unreadInputs =
         new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1);
 
@@ -191,7 +193,7 @@ public class BoundedReadEvaluatorFactoryTest {
     PCollection<Long> read =
         TestPipeline.create()
             .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
-    AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
@@ -238,7 +240,7 @@ public class BoundedReadEvaluatorFactoryTest {
             });
     Collection<CommittedBundle<?>> initialInputs =
         new BoundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 3);
+            .getInitialInputs(longsProducer, 3);
 
     assertThat(initialInputs, hasSize(allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(4))));
 
@@ -271,7 +273,7 @@ public class BoundedReadEvaluatorFactoryTest {
     CommittedBundle<BoundedSourceShard<Long>> shards = rootBundle.commit(Instant.now());
 
     TransformEvaluator<BoundedSourceShard<Long>> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), shards);
+        factory.forApplication(longsProducer, shards);
     for (WindowedValue<BoundedSourceShard<Long>> shard : shards.getElements()) {
       UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs);
       when(context.createBundle(longs)).thenReturn(outputBundle);
@@ -299,7 +301,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
 
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
@@ -320,7 +322,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
 
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index d218a81..fb84de8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -48,6 +48,7 @@ import org.junit.runners.JUnit4;
 /**
  * Tests for {@link DirectGraphVisitor}.
  */
+// TODO: Replace uses of getProducing
 @RunWith(JUnit4.class)
 public class DirectGraphVisitorTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
new file mode 100644
index 0000000..73ada19
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.values.PValue;
+
+/** Test utilities for the {@link DirectRunner}. */
+final class DirectGraphs {
+  public static DirectGraph getGraph(Pipeline p) {
+    DirectGraphVisitor visitor = new DirectGraphVisitor();
+    p.traverseTopologically(visitor);
+    return visitor.getGraph();
+  }
+
+  public static AppliedPTransform<?, ?, ?> getProducer(PValue value) {
+    return getGraph(value.getPipeline()).getProducer(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 17cdea1..a2bb15e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -86,9 +86,13 @@ public class EvaluationContextTest {
   private PCollectionView<Iterable<Integer>> view;
   private PCollection<Long> unbounded;
 
-  private BundleFactory bundleFactory;
   private DirectGraph graph;
 
+  private AppliedPTransform<?, ?, ?> createdProducer;
+  private AppliedPTransform<?, ?, ?> downstreamProducer;
+  private AppliedPTransform<?, ?, ?> viewProducer;
+  private AppliedPTransform<?, ?, ?> unboundedProducer;
+
   @Before
   public void setup() {
     DirectRunner runner =
@@ -101,14 +105,16 @@ public class EvaluationContextTest {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(CountingInput.unbounded());
 
-    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
-    p.traverseTopologically(graphVisitor);
-
-    bundleFactory = ImmutableListBundleFactory.create();
-    graph = graphVisitor.getGraph();
+    BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+    graph = DirectGraphs.getGraph(p);
     context =
         EvaluationContext.create(
             runner.getPipelineOptions(), NanosOffsetClock.create(), bundleFactory, graph);
+
+    createdProducer = graph.getProducer(created);
+    downstreamProducer = graph.getProducer(downstream);
+    viewProducer = graph.getProducer(view);
+    unboundedProducer = graph.getProducer(unbounded);
   }
 
   @Test
@@ -146,7 +152,7 @@ public class EvaluationContextTest {
   @Test
   public void getExecutionContextSameStepSameKeyState() {
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
@@ -159,12 +165,12 @@ public class EvaluationContextTest {
             .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), created)
             .commit(Instant.now()),
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(created.getProducingTransformInternal())
+        StepTransformResult.withoutHold(createdProducer)
             .withState(stepContext.commitState())
             .build());
 
     DirectExecutionContext secondFooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
     assertThat(
         secondFooContext
@@ -179,7 +185,7 @@ public class EvaluationContextTest {
   @Test
   public void getExecutionContextDifferentKeysIndependentState() {
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
@@ -191,7 +197,7 @@ public class EvaluationContextTest {
         .add(1);
 
     DirectExecutionContext barContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("bar", StringUtf8Coder.of()));
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
@@ -207,7 +213,7 @@ public class EvaluationContextTest {
   public void getExecutionContextDifferentStepsIndependentState() {
     StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of());
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(createdProducer, myKey);
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
@@ -218,7 +224,7 @@ public class EvaluationContextTest {
         .add(1);
 
     DirectExecutionContext barContext =
-        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(downstreamProducer, myKey);
     assertThat(
         barContext
             .getOrCreateStepContext("s1", "s1")
@@ -232,15 +238,15 @@ public class EvaluationContextTest {
   public void handleResultCommitsAggregators() {
     Class<?> fn = getClass();
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), null);
+        context.getExecutionContext(createdProducer, null);
     DirectExecutionContext.StepContext stepContext = fooContext.createStepContext(
-        "STEP", created.getProducingTransformInternal().getTransform().getName());
+        "STEP", createdProducer.getTransform().getName());
     AggregatorContainer container = context.getAggregatorContainer();
     AggregatorContainer.Mutator mutator = container.createMutator();
     mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
 
     TransformResult<?> result =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal())
+        StepTransformResult.withoutHold(createdProducer)
             .withAggregatorChanges(mutator)
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), result);
@@ -250,7 +256,7 @@ public class EvaluationContextTest {
     mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
 
     TransformResult<?> secondResult =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+        StepTransformResult.withoutHold(downstreamProducer)
             .withAggregatorChanges(mutatorAgain)
             .build();
     context.handleResult(
@@ -264,7 +270,7 @@ public class EvaluationContextTest {
   public void handleResultStoresState() {
     StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
     DirectExecutionContext fooContext =
-        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(downstreamProducer, myKey);
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
@@ -276,7 +282,7 @@ public class EvaluationContextTest {
     bag.add(4);
 
     TransformResult<?> stateResult =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+        StepTransformResult.withoutHold(downstreamProducer)
             .withState(state)
             .build();
 
@@ -286,7 +292,7 @@ public class EvaluationContextTest {
         stateResult);
 
     DirectExecutionContext afterResultContext =
-        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(downstreamProducer, myKey);
 
     CopyOnAccessInMemoryStateInternals<Object> afterResultState =
         afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
@@ -309,7 +315,7 @@ public class EvaluationContextTest {
         downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
 
     TransformResult<?> result =
-        StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
+        StepTransformResult.withHold(createdProducer, new Instant(0))
             .build();
 
     context.handleResult(null, ImmutableList.<TimerData>of(), result);
@@ -318,7 +324,7 @@ public class EvaluationContextTest {
     assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
 
     TransformResult<?> finishedResult =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
     context.forceRefresh();
     // Obtain the value via blocking call
@@ -328,7 +334,7 @@ public class EvaluationContextTest {
   @Test
   public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
     TransformResult<?> finishedResult =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
 
     final CountDownLatch callLatch = new CountDownLatch(1);
@@ -348,7 +354,7 @@ public class EvaluationContextTest {
   @Test
   public void extractFiredTimersExtractsTimers() {
     TransformResult<?> holdResult =
-        StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
+        StepTransformResult.withHold(createdProducer, new Instant(0))
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
 
@@ -356,7 +362,7 @@ public class EvaluationContextTest {
     TimerData toFire =
         TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
     TransformResult<?> timerResult =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+        StepTransformResult.withoutHold(downstreamProducer)
             .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
             .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
             .build();
@@ -372,7 +378,7 @@ public class EvaluationContextTest {
     assertThat(context.extractFiredTimers(), emptyIterable());
 
     TransformResult<?> advanceResult =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
     // Should cause the downstream timer to fire
     context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
 
@@ -403,14 +409,14 @@ public class EvaluationContextTest {
   @Test
   public void isDoneWithUnboundedPCollectionAndShutdown() {
     context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+    assertThat(context.isDone(unboundedProducer), is(false));
 
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(unboundedProducer).build());
     context.extractFiredTimers();
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true));
+    assertThat(context.isDone(unboundedProducer), is(true));
   }
 
   @Test
@@ -428,14 +434,14 @@ public class EvaluationContextTest {
   @Test
   public void isDoneWithOnlyBoundedPCollections() {
     context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(created.getProducingTransformInternal()), is(false));
+    assertThat(context.isDone(createdProducer), is(false));
 
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(createdProducer).build());
     context.extractFiredTimers();
-    assertThat(context.isDone(created.getProducingTransformInternal()), is(true));
+    assertThat(context.isDone(createdProducer), is(true));
   }
 
   @Test
@@ -449,7 +455,7 @@ public class EvaluationContextTest {
         context.handleResult(
             null,
             ImmutableList.<TimerData>of(),
-            StepTransformResult.<Integer>withoutHold(created.getProducingTransformInternal())
+            StepTransformResult.<Integer>withoutHold(createdProducer)
                 .addOutput(rootBundle)
                 .build());
     @SuppressWarnings("unchecked")
@@ -458,7 +464,7 @@ public class EvaluationContextTest {
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(unboundedProducer).build());
     assertThat(context.isDone(), is(false));
 
     for (AppliedPTransform<?, ?, ?> consumers : graph.getPrimitiveConsumers(created)) {
@@ -479,22 +485,22 @@ public class EvaluationContextTest {
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(createdProducer).build());
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(unboundedProducer).build());
     context.handleResult(
         context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(downstreamProducer).build());
     context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
 
     context.handleResult(
         context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(viewProducer).build());
     context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index cb27fbc..9e22c36 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -47,6 +47,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class FlattenEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
   @Test
   public void testFlattenInMemoryEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -69,10 +70,11 @@ public class FlattenEvaluatorFactoryTest {
     when(context.createBundle(flattened)).thenReturn(flattenedLeftBundle, flattenedRightBundle);
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context);
+    AppliedPTransform<?, ?, ?> flattenedProducer = DirectGraphs.getProducer(flattened);
     TransformEvaluator<Integer> leftSideEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), leftBundle);
+        factory.forApplication(flattenedProducer, leftBundle);
     TransformEvaluator<Integer> rightSideEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), rightBundle);
+        factory.forApplication(flattenedProducer, rightBundle);
 
     leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1));
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
@@ -92,13 +94,13 @@ public class FlattenEvaluatorFactoryTest {
         Matchers.<UncommittedBundle<?>>contains(flattenedRightBundle));
     assertThat(
         rightSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal()));
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer));
     assertThat(
         leftSideResult.getOutputBundles(),
         Matchers.<UncommittedBundle<?>>contains(flattenedLeftBundle));
     assertThat(
         leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal()));
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer));
 
     assertThat(
         flattenedLeftBundle.commit(Instant.now()).getElements(),
@@ -126,9 +128,10 @@ public class FlattenEvaluatorFactoryTest {
         .thenReturn(bundleFactory.createBundle(flattened));
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
+    AppliedPTransform<?, ?, ?> flattendProducer = DirectGraphs.getProducer(flattened);
     TransformEvaluator<Integer> emptyEvaluator =
         factory.forApplication(
-            flattened.getProducingTransformInternal(),
+            flattendProducer,
             bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
 
     TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();
@@ -138,7 +141,7 @@ public class FlattenEvaluatorFactoryTest {
     assertThat(outputBundle.getElements(), emptyIterable());
     assertThat(
         leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal()));
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattendProducer));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 7ba38ce..f0b29f0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -97,7 +97,7 @@ public class GroupByKeyEvaluatorFactoryTest {
         ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
-            .forApplication(groupedKvs.getProducingTransformInternal(), inputBundle);
+            .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
     evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 23340c6..7efdb3d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -90,8 +90,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
         ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
-            .forApplication(
-                groupedKvs.getProducingTransformInternal(), inputBundle);
+            .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
     evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index a65cd30..1ad6ba6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -64,7 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
                         c.element()[0] = 'b';
                       }
                     }));
-    consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
+    consumer = DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 85e99c5..d48ac14 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -152,8 +152,9 @@ public class ParDoEvaluatorTest {
     when(evaluationContext.getAggregatorContainer()).thenReturn(container);
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
+    @SuppressWarnings("unchecked")
     AppliedPTransform<PCollection<Integer>, ?, ?> transform =
-        (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal();
+        (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output);
     return ParDoEvaluator.create(
         evaluationContext,
         stepContext,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index ecf11ed..06c85ef 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -129,7 +129,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     AppliedPTransform<
             PCollection<? extends KV<String, Iterable<Integer>>>, PCollectionTuple,
             StatefulParDo<String, Integer, Integer>>
-        producingTransform = (AppliedPTransform) produced.getProducingTransformInternal();
+        producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced);
 
     // Then there will be a digging down to the step context to get the state internals
     when(mockEvaluationContext.getExecutionContext(
@@ -239,7 +239,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     AppliedPTransform<
             PCollection<KV<String, Iterable<Integer>>>, PCollectionTuple,
             StatefulParDo<String, Integer, Integer>>
-        producingTransform = (AppliedPTransform) produced.getProducingTransformInternal();
+        producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced);
 
     // Then there will be a digging down to the step context to get the state internals
     when(mockEvaluationContext.getExecutionContext(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index a21d8f7..d3a2cca 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -48,7 +48,7 @@ public class StepTransformResultTest {
   public void setup() {
     TestPipeline p = TestPipeline.create();
     pc = p.apply(Create.of(1, 2, 3));
-    transform = pc.getProducingTransformInternal();
+    transform = DirectGraphs.getGraph(p).getProducer(pc);
 
     bundleFactory = ImmutableListBundleFactory.create();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 3d31df6..6bb8623 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.TestStreamIndex
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -80,15 +81,16 @@ public class TestStreamEvaluatorFactoryTest {
     when(context.createBundle(streamVals))
         .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals));
 
+    AppliedPTransform<?, ?, ?> streamProducer = DirectGraphs.getProducer(streamVals);
     Collection<CommittedBundle<?>> initialInputs =
         new TestStreamEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(streamVals.getProducingTransformInternal(), 1);
+            .getInitialInputs(streamProducer, 1);
     @SuppressWarnings("unchecked")
     CommittedBundle<TestStreamIndex<Integer>> initialBundle =
         (CommittedBundle<TestStreamIndex<Integer>>) Iterables.getOnlyElement(initialInputs);
 
     TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle);
+        factory.forApplication(streamProducer, initialBundle);
     firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements()));
     TransformResult<TestStreamIndex<Integer>> firstResult = firstEvaluator.finishBundle();
 
@@ -101,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> secondBundle =
         initialBundle.withElements(Collections.singleton(firstResidual));
     TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle);
+        factory.forApplication(streamProducer, secondBundle);
     secondEvaluator.processElement(firstResidual);
     TransformResult<TestStreamIndex<Integer>> secondResult = secondEvaluator.finishBundle();
 
@@ -114,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> thirdBundle =
         secondBundle.withElements(Collections.singleton(secondResidual));
     TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle);
+        factory.forApplication(streamProducer, thirdBundle);
     thirdEvaluator.processElement(secondResidual);
     TransformResult<TestStreamIndex<Integer>> thirdResult = thirdEvaluator.finishBundle();
 
@@ -128,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> fourthBundle =
         thirdBundle.withElements(Collections.singleton(thirdResidual));
     TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle);
+        factory.forApplication(streamProducer, fourthBundle);
     fourthEvaluator.processElement(thirdResidual);
     TransformResult<TestStreamIndex<Integer>> fourthResult = fourthEvaluator.finishBundle();
 
@@ -142,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> fifthBundle =
         thirdBundle.withElements(Collections.singleton(fourthResidual));
     TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle);
+        factory.forApplication(streamProducer, fifthBundle);
     fifthEvaluator.processElement(fourthResidual);
     TransformResult<TestStreamIndex<Integer>> fifthResult = fifthEvaluator.finishBundle();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 08b1e18..4ad22bc 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -89,8 +89,9 @@ public class TransformExecutorTest {
     created = p.apply(Create.of("foo", "spam", "third"));
     PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
 
-    createdProducer = created.getProducingTransformInternal();
-    downstreamProducer = downstream.getProducingTransformInternal();
+    DirectGraph graph = DirectGraphs.getGraph(p);
+    createdProducer = graph.getProducer(created);
+    downstreamProducer = graph.getProducer(downstream);
 
     when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
@@ -317,7 +318,7 @@ public class TransformExecutorTest {
   @Test
   public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
     final TransformResult<Object> result =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
@@ -356,7 +357,7 @@ public class TransformExecutorTest {
   @Test
   public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
     final TransformResult<Object> result =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 5a10134..dd36a2f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -90,6 +91,7 @@ public class UnboundedReadEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   private UnboundedSource<Long, ?> source;
+  private DirectGraph graph;
 
   @Before
   public void setup() {
@@ -100,6 +102,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     context = mock(EvaluationContext.class);
     factory = new UnboundedReadEvaluatorFactory(context);
     output = bundleFactory.createBundle(longs);
+    graph = DirectGraphs.getGraph(p);
     when(context.createBundle(longs)).thenReturn(output);
   }
 
@@ -115,7 +118,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     int numSplits = 5;
     Collection<CommittedBundle<?>> initialInputs =
         new UnboundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), numSplits);
+            .getInitialInputs(graph.getProducer(longs), numSplits);
     // CountingSource.unbounded has very good splitting behavior
     assertThat(initialInputs, hasSize(numSplits));
 
@@ -148,15 +151,14 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     Collection<CommittedBundle<?>> initialInputs =
         new UnboundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 1);
+            .getInitialInputs(graph.getProducer(longs), 1);
 
     CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
     UnboundedSourceShard<Long, ?> inputShard =
         (UnboundedSourceShard<Long, ?>)
             Iterables.getOnlyElement(inputShards.getElements()).getValue();
     TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator =
-        factory.forApplication(
-            longs.getProducingTransformInternal(), inputShards);
+        factory.forApplication(graph.getProducer(longs), inputShards);
 
     evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements()));
     TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle();
@@ -190,7 +192,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     Collection<CommittedBundle<?>> initialInputs =
@@ -233,7 +235,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     // Read with a very slow rate so by the second read there are no more elements
     PCollection<Long> pcollection =
         p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     Collection<CommittedBundle<?>> initialInputs =
@@ -291,7 +293,9 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+    DirectGraph graph = DirectGraphs.getGraph(p);
+    AppliedPTransform<?, ?, ?> sourceTransform =
+        graph.getProducer(pcollection);
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
@@ -307,8 +311,7 @@ public class UnboundedReadEvaluatorFactoryTest {
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
         new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */);
-    new UnboundedReadEvaluatorFactory.InputProvider(context)
-        .getInitialInputs(pcollection.getProducingTransformInternal(), 1);
+    new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform, 1);
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);
@@ -336,7 +339,8 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform =
+        DirectGraphs.getGraph(p).getProducer(pcollection);
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 7d14020..7c08009 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Values;
@@ -73,9 +74,10 @@ public class ViewEvaluatorFactoryTest {
 
     CommittedBundle<String> inputBundle =
         bundleFactory.createBundle(input).commit(Instant.now());
+    AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);
     TransformEvaluator<Iterable<String>> evaluator =
         new ViewEvaluatorFactory(context)
-            .forApplication(view.getProducingTransformInternal(), inputBundle);
+            .forApplication(producer, inputBundle);
 
     evaluator.processElement(
         WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar")));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index 1be9a98..acdabb6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -55,8 +55,10 @@ public class WatermarkCallbackExecutorTest {
   public void setup() {
     TestPipeline p = TestPipeline.create();
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    create = created.getProducingTransformInternal();
-    sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal();
+    PCollection<Integer> summed = created.apply(Sum.integersGlobally());
+    DirectGraph graph = DirectGraphs.getGraph(p);
+    create = graph.getProducer(created);
+    sum = graph.getProducer(summed);
   }
 
   @Test


Mime
View raw message