beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4073) The DirectRunner should interact with a Pipeline via an abstraction of the Graph rather than SDK types
Date Tue, 08 May 2018 23:24:01 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4073?focusedWorklogId=99784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99784 ]

ASF GitHub Bot logged work on BEAM-4073:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/May/18 23:23
            Start Date: 08/May/18 23:23
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5270: [BEAM-4073] Migrate the `portable` subpackage to use Portable representations
URL: https://github.com/apache/beam/pull/5270
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index a68f529949e..2eb27873105 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -273,6 +273,12 @@ public static String urnForTransform(PTransform<?, ?> transform) {
     return urn;
   }
 
+  /** Returns the URN for the transform if it is known, otherwise {@code null}. */
+  @Nullable
+  public static String urnForTransformOrNull(RunnerApi.PTransform transform) {
+    return transform.getSpec() == null ? null : transform.getSpec().getUrn();
+  }
+
   /**
    * A bi-directional translator between a Java-based {@link PTransform} and a protobuf payload for
    * that transform.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
index 1a93c621550..38f3bfa5746 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.annotations.Internal;
 import org.joda.time.Instant;
 
 /**
  * Access to the current time.
  */
-interface Clock {
+@Internal
+@FunctionalInterface
+public interface Clock {
   /**
    * Returns the current time as an {@link Instant}.
    */
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 64f242bf1ff..823897f102d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -57,6 +57,7 @@
 import org.apache.beam.runners.local.Bundle;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -126,7 +127,8 @@
  * Watermark_PCollection = Watermark_Out_ProducingPTransform
  * </pre>
  */
-class WatermarkManager<ExecutableT, CollectionT> {
+@Internal
+public class WatermarkManager<ExecutableT, CollectionT> {
   // The number of updates to apply in #tryApplyPendingUpdates
   private static final int MAX_INCREMENTAL_UPDATES = 10;
 
@@ -785,8 +787,9 @@ public Instant get() {
    * @param clock the clock to use to determine processing time
    * @param graph the graph representing this pipeline
    */
-  public static WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> create(
-      Clock clock, DirectGraph graph) {
+  public static <ExecutableT, CollectionT>
+      WatermarkManager<ExecutableT, ? super CollectionT> create(
+          Clock clock, ExecutableGraph<ExecutableT, ? super CollectionT> graph) {
     return new WatermarkManager<>(clock, graph);
   }
 
@@ -1033,7 +1036,7 @@ private void updatePending(
    * Refresh the watermarks contained within this {@link WatermarkManager}, causing all
    * watermarks to be advanced as far as possible.
    */
-  synchronized void refreshAll() {
+  public synchronized void refreshAll() {
     refreshLock.lock();
     try {
       applyAllPendingUpdates();
@@ -1443,17 +1446,17 @@ private TimerUpdate(
     }
 
     @VisibleForTesting
-    Iterable<? extends TimerData> getCompletedTimers() {
+    public Iterable<? extends TimerData> getCompletedTimers() {
       return completedTimers;
     }
 
     @VisibleForTesting
-    Iterable<? extends TimerData> getSetTimers() {
+    public Iterable<? extends TimerData> getSetTimers() {
       return setTimers;
     }
 
     @VisibleForTesting
-    Iterable<? extends TimerData> getDeletedTimers() {
+    public Iterable<? extends TimerData> getDeletedTimers() {
       return deletedTimers;
     }
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactory.java
index c6c13955380..8fa4293a0ae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct.portable;
 
-import org.apache.beam.runners.direct.portable.DirectGroupByKey.DirectGroupByKeyOnly;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,13 +38,12 @@
    * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
    * belong to the {@code output} {@link PCollection}.
    */
-  <T> UncommittedBundle<T> createBundle(PCollection<T> output);
+  <T> UncommittedBundle<T> createBundle(PCollectionNode output);
 
   /**
    * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
-   * {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
+   * {@code DirectGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
    * belong to the {@code output} {@link PCollection}.
    */
-  <K, T> UncommittedBundle<T> createKeyedBundle(
-      StructuralKey<K> key, PCollection<T> output);
+  <K, T> UncommittedBundle<T> createKeyedBundle(StructuralKey<K> key, PCollectionNode output);
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/Clock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/Clock.java
deleted file mode 100644
index 52d06a5335e..00000000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/Clock.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import org.joda.time.Instant;
-
-/**
- * Access to the current time.
- */
-interface Clock {
-  /**
-   * Returns the current time as an {@link Instant}.
-   */
-  Instant now();
-}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
index 3e605376d2f..94bede0a6f2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedBundle.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.direct.portable;
 
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.local.Bundle;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -33,12 +34,12 @@
  * a part of at a later point.
  * @param <T> the type of elements contained within this bundle
  */
-interface CommittedBundle<T> extends Bundle<T, PCollection<T>> {
+interface CommittedBundle<T> extends Bundle<T, PCollectionNode> {
   /**
    * Returns the PCollection that the elements of this bundle belong to.
    */
   @Nullable
-  PCollection<T> getPCollection();
+  PCollectionNode getPCollection();
 
   /**
    * Returns the key that was output in the most recent {@code GroupByKey} in the
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedResult.java
index 85c14cf77fe..3e5352c600a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CommittedResult.java
@@ -21,7 +21,7 @@
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Optional;
 import java.util.Set;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 
 /**
@@ -30,7 +30,7 @@
 @AutoValue
 abstract class CommittedResult<ExecutableT> {
   /**
-   * Returns the {@link AppliedPTransform} that produced this result.
+   * Returns the {@link PTransformNode} that produced this result.
    */
   public abstract ExecutableT getExecutable();
 
@@ -40,7 +40,6 @@
    * input elements, and absent otherwise.
    */
   public abstract Optional<? extends CommittedBundle<?>> getUnprocessedInputs();
-
   /**
    * Returns the outputs produced by the transform.
    */
@@ -55,7 +54,7 @@
    */
   public abstract Set<OutputType> getProducedOutputTypes();
 
-  public static CommittedResult<AppliedPTransform<?, ?, ?>> create(
+  public static CommittedResult<PTransformNode> create(
       TransformResult<?> original,
       Optional<? extends CommittedBundle<?>> unprocessedElements,
       Iterable<? extends CommittedBundle<?>> outputs,
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CompletionCallback.java
index 5e9ab9412f9..bef0092de8b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/CompletionCallback.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct.portable;
 
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
 /**
  * A callback for completing a bundle of input.
@@ -34,7 +34,7 @@ CommittedResult handleResult(
    *
    * <p>This occurs when a Source has no splits that can currently produce outputs.
    */
-  void handleEmpty(AppliedPTransform<?, ?, ?> transform);
+  void handleEmpty(PTransformNode transform);
 
   /**
    * Handle a result that terminated abnormally due to the provided {@link Exception}.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java
index af4e166e2ac..c27c762a8eb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java
@@ -22,8 +22,9 @@
 import java.util.Map;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.Clock;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.local.StructuralKey;
 
 /**
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
deleted file mode 100644
index 35eafd5e364..00000000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
-class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
-    extends SingleInputOutputOverrideFactory<
-        PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
-        GBKIntoKeyedWorkItems<KeyT, InputT>> {
-  @Override
-  public PTransformReplacement<
-          PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
-      getReplacementTransform(
-          AppliedPTransform<
-                  PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
-              GBKIntoKeyedWorkItems<KeyT, InputT>>
-              transform) {
-    return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
-        new DirectGroupByKey.DirectGroupByKeyOnly<>());
-  }
-}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGroupByKeyOverrideFactory.java
deleted file mode 100644
index 114f6f2d1a1..00000000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGroupByKeyOverrideFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import com.google.common.collect.Iterables;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
-final class DirectGroupByKeyOverrideFactory<K, V>
-    extends SingleInputOutputOverrideFactory<
-        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
-        PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
-  @Override
-  public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
-      getReplacementTransform(
-          AppliedPTransform<
-                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
-                  PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
-              transform) {
-
-    PCollection<KV<K, Iterable<V>>> output =
-        (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values());
-
-    return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
-        new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy()));
-  }
-}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTimerInternals.java
index 32524f0ed13..94f80565ad3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTimerInternals.java
@@ -20,9 +20,10 @@
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.Clock;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.joda.time.Instant;
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTransformExecutor.java
index 223467c733b..4dfbd023dfc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectTransformExecutor.java
@@ -20,10 +20,10 @@
 import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.util.concurrent.Callable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@
     @Override
     public TransformExecutor create(
         CommittedBundle<?> bundle,
-        AppliedPTransform<?, ?, ?> transform,
+        PTransformNode transform,
         CompletionCallback onComplete,
         TransformExecutorService executorService) {
       return new DirectTransformExecutor<>(
@@ -59,7 +59,7 @@ public TransformExecutor create(
   private final TransformEvaluatorRegistry evaluatorRegistry;
 
   /** The transform that will be evaluated. */
-  private final AppliedPTransform<?, ?, ?> transform;
+  private final PTransformNode transform;
   /** The inputs this {@link DirectTransformExecutor} will deliver to the transform. */
   private final CommittedBundle<T> inputBundle;
 
@@ -72,7 +72,7 @@ public TransformExecutor create(
       EvaluationContext context,
       TransformEvaluatorRegistry factory,
       CommittedBundle<T> inputBundle,
-      AppliedPTransform<?, ?, ?> transform,
+      PTransformNode transform,
       CompletionCallback completionCallback,
       TransformExecutorService transformEvaluationState) {
     this.evaluatorRegistry = factory;
@@ -88,7 +88,7 @@ public TransformExecutor create(
 
   @Override
   public void run() {
-    MetricsContainerImpl metricsContainer = new MetricsContainerImpl(transform.getFullName());
+    MetricsContainerImpl metricsContainer = new MetricsContainerImpl(transform.getId());
     try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
       TransformEvaluator<T> evaluator =
           evaluatorRegistry.forApplication(transform, inputBundle);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
index b2da88665ea..e387272e9a4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
@@ -31,14 +31,16 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.direct.Clock;
 import org.apache.beam.runners.direct.ExecutableGraph;
+import org.apache.beam.runners.direct.WatermarkManager;
+import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.direct.portable.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.portable.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.portable.WatermarkManager.FiredTimers;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
@@ -65,18 +67,15 @@
  * executed.
  */
 class EvaluationContext {
-  /**
-   * The graph representing this {@link Pipeline}.
-   */
-  private final ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph;
+  /** The graph representing this {@link Pipeline}. */
+  private final ExecutableGraph<PTransformNode, ? super PCollectionNode> graph;
 
   private final Clock clock;
 
   private final BundleFactory bundleFactory;
 
   /** The current processing time and event time watermarks and timers. */
-  private final WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>>
-      watermarkManager;
+  private final WatermarkManager<PTransformNode, ? super PCollectionNode> watermarkManager;
 
   /** Executes callbacks based on the progression of the watermark. */
   private final WatermarkCallbackExecutor callbackExecutor;
@@ -87,21 +86,21 @@
 
   private final DirectMetrics metrics;
 
-  private final Set<PValue> keyedPValues;
+  private final Set<PCollectionNode> keyedPValues;
 
   public static EvaluationContext create(
       Clock clock,
       BundleFactory bundleFactory,
-      ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph,
-      Set<PValue> keyedPValues) {
+      ExecutableGraph<PTransformNode, ? super PCollectionNode> graph,
+      Set<PCollectionNode> keyedPValues) {
     return new EvaluationContext(clock, bundleFactory, graph, keyedPValues);
   }
 
   private EvaluationContext(
       Clock clock,
       BundleFactory bundleFactory,
-      ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>>graph,
-      Set<PValue> keyedPValues) {
+      ExecutableGraph<PTransformNode, ? super PCollectionNode>graph,
+      Set<PCollectionNode> keyedPValues) {
     this.clock = clock;
     this.bundleFactory = checkNotNull(bundleFactory);
     this.graph = checkNotNull(graph);
@@ -116,7 +115,7 @@ private EvaluationContext(
   }
 
   public void initialize(
-      Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> initialInputs) {
+      Map<PTransformNode, ? extends Iterable<CommittedBundle<?>>> initialInputs) {
     watermarkManager.initialize((Map) initialInputs);
   }
 
@@ -135,7 +134,7 @@ public void initialize(
    * @param result the result of evaluating the input bundle
    * @return the committed bundles contained within the handled {@code result}
    */
-  public CommittedResult<AppliedPTransform<?, ?, ?>> handleResult(
+  public CommittedResult<PTransformNode> handleResult(
       CommittedBundle<?> completedBundle,
       Iterable<TimerData> completedTimers,
       TransformResult<?> result) {
@@ -150,7 +149,7 @@ public void initialize(
     } else {
       outputTypes.add(OutputType.BUNDLE);
     }
-    CommittedResult<AppliedPTransform<?, ?, ?>> committedResult =
+    CommittedResult<PTransformNode> committedResult =
         CommittedResult.create(
             result, getUnprocessedInput(completedBundle, result), committedBundles, outputTypes);
     // Update state internals
@@ -196,7 +195,7 @@ public void initialize(
       Iterable<? extends UncommittedBundle<?>> bundles) {
     ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
     for (UncommittedBundle<?> inProgress : bundles) {
-      AppliedPTransform<?, ?, ?> producing =
+      PTransformNode producing =
           graph.getProducer(inProgress.getPCollection());
       TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
       CommittedBundle<?> committed =
@@ -211,12 +210,12 @@ public void initialize(
   }
 
   private void fireAllAvailableCallbacks() {
-    for (AppliedPTransform<?, ?, ?> transform : graph.getExecutables()) {
+    for (PTransformNode transform : graph.getExecutables()) {
       fireAvailableCallbacks(transform);
     }
   }
 
-  private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
+  private void fireAvailableCallbacks(PTransformNode producingTransform) {
     TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
     Instant outputWatermark = watermarks.getOutputWatermark();
     callbackExecutor.fireForWatermark(producingTransform, outputWatermark);
@@ -233,16 +232,16 @@ private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransfor
    * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
    * PCollection}.
    */
-  public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
+  public <T> UncommittedBundle<T> createBundle(PCollectionNode output) {
     return bundleFactory.createBundle(output);
   }
 
   /**
    * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
-   * {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}.
+   * {@code DirectGroupByKeyOnly} {@link PTransform PTransforms}.
    */
   public <K, T> UncommittedBundle<T> createKeyedBundle(
-      StructuralKey<K> key, PCollection<T> output) {
+      StructuralKey<K> key, PCollectionNode output) {
     return bundleFactory.createKeyedBundle(key, output);
   }
 
@@ -250,7 +249,7 @@ private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransfor
    * Indicate whether or not this {@link PCollection} has been determined to be
    * keyed.
    */
-  public <T> boolean isKeyed(PValue pValue) {
+  public <T> boolean isKeyed(PCollectionNode pValue) {
     return keyedPValues.contains(pValue);
   }
 
@@ -267,11 +266,11 @@ private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransfor
    * callback will be executed regardless of whether values have been produced.
    */
   public void scheduleAfterOutputWouldBeProduced(
-      PCollection<?> value,
+      PCollectionNode value,
       BoundedWindow window,
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
-    AppliedPTransform<?, ?, ?> producing = graph.getProducer(value);
+    PTransformNode producing = graph.getProducer(value);
     callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
 
     fireAvailableCallbacks(producing);
@@ -283,7 +282,7 @@ public void scheduleAfterOutputWouldBeProduced(
    * <p>For example, upstream state associated with the window may be cleared.
    */
   public void scheduleAfterWindowExpiration(
-      AppliedPTransform<?, ?, ?> producing,
+      PTransformNode producing,
       BoundedWindow window,
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
@@ -293,15 +292,15 @@ public void scheduleAfterWindowExpiration(
   }
 
   /**
-   * Get a {@link DirectExecutionContext} for the provided {@link AppliedPTransform} and key.
+   * Get a {@link DirectExecutionContext} for the provided {@link PTransformNode} and key.
    */
   public DirectExecutionContext getExecutionContext(
-      AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
+      PTransformNode application, StructuralKey<?> key) {
     StepAndKey stepAndKey = StepAndKey.of(application, key);
     return new DirectExecutionContext(
         clock,
         key,
-        (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey),
+        applicationStateInternals.get(stepAndKey),
         watermarkManager.getWatermarks(application));
   }
 
@@ -309,12 +308,12 @@ public DirectExecutionContext getExecutionContext(
   /**
    * Get the Step Name for the provided application.
    */
-  String getStepName(AppliedPTransform<?, ?, ?> application) {
+  String getStepName(PTransformNode application) {
     throw new UnsupportedOperationException("getStepName Unsupported");
   }
 
   /** Returns all of the steps in this {@link Pipeline}. */
-  Collection<AppliedPTransform<?, ?, ?>> getSteps() {
+  Collection<PTransformNode> getSteps() {
     return graph.getExecutables();
   }
 
@@ -335,7 +334,7 @@ void forceRefresh() {
    * <p>This is a destructive operation. Timers will only appear in the result of this method once
    * for each time they are set.
    */
-  public Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> extractFiredTimers() {
+  public Collection<FiredTimers<PTransformNode>> extractFiredTimers() {
     forceRefresh();
     return watermarkManager.extractFiredTimers();
   }
@@ -343,7 +342,7 @@ void forceRefresh() {
   /**
    * Returns true if the step will not produce additional output.
    */
-  public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
+  public boolean isDone(PTransformNode transform) {
     // the PTransform is done only if watermark is at the max value
     Instant stepWatermark = watermarkManager.getWatermarks(transform).getOutputWatermark();
     return !stepWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -353,7 +352,7 @@ public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
    * Returns true if all steps are done.
    */
   public boolean isDone() {
-    for (AppliedPTransform<?, ?, ?> transform : graph.getExecutables()) {
+    for (PTransformNode transform : graph.getExecutables()) {
       if (!isDone(transform)) {
         return false;
       }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index 5a5766936f7..0e5eeec8ae3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -36,16 +36,15 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.ExecutableGraph;
 import org.apache.beam.runners.local.ExecutionDriver;
 import org.apache.beam.runners.local.ExecutionDriver.DriverState;
 import org.apache.beam.runners.local.PipelineMessageReceiver;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -57,7 +56,7 @@
  */
 final class ExecutorServiceParallelExecutor
     implements PipelineExecutor,
-        BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>> {
+        BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode> {
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
 
   private final int targetParallelism;
@@ -131,12 +130,12 @@ public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
 
   @Override
   public void start(
-      ExecutableGraph<AppliedPTransform<?, ?, ?>, PCollection<?>> graph,
+      ExecutableGraph<PTransformNode, PCollectionNode> graph,
       RootProviderRegistry rootProviderRegistry) {
     int numTargetSplits = Math.max(3, targetParallelism);
-    ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
+    ImmutableMap.Builder<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>>
         pendingRootBundles = ImmutableMap.builder();
-    for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
+    for (PTransformNode root : graph.getRootTransforms()) {
       ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
       try {
         Collection<CommittedBundle<?>> initialInputs =
@@ -184,13 +183,13 @@ public void run() {
   @Override
   public void process(
       CommittedBundle<?> bundle,
-      AppliedPTransform<?, ?, ?> consumer,
+      PTransformNode consumer,
       CompletionCallback onComplete) {
     evaluateBundle(consumer, bundle, onComplete);
   }
 
   private <T> void evaluateBundle(
-      final AppliedPTransform<?, ?, ?> transform,
+      final PTransformNode transform,
       final CommittedBundle<T> bundle,
       final CompletionCallback onComplete) {
     TransformExecutorService transformExecutor;
@@ -214,7 +213,7 @@ public void process(
     }
   }
 
-  private boolean isKeyed(PValue pvalue) {
+  private boolean isKeyed(PCollectionNode pvalue) {
     return evaluationContext.isKeyed(pvalue);
   }
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
index 0e3de00d430..5ee61dfcbea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
@@ -17,14 +17,10 @@
  */
 package org.apache.beam.runners.direct.portable;
 
-import com.google.common.collect.Iterables;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
@@ -39,10 +35,9 @@
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
+      PTransformNode application,  CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator =
-        (TransformEvaluator<InputT>) createInMemoryEvaluator((AppliedPTransform) application);
+    TransformEvaluator<InputT> evaluator = createInMemoryEvaluator(application);
     return evaluator;
   }
 
@@ -50,15 +45,8 @@
   public void cleanup() throws Exception {}
 
   private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
-      final AppliedPTransform<
-              PCollectionList<InputT>, PCollection<InputT>, PCollections<InputT>>
-          application) {
-    final UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(
-            (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs().values()));
-    final TransformResult<InputT> result =
-        StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
-    return new FlattenEvaluator<>(outputBundle, result);
+      final PTransformNode application) {
+    throw new UnsupportedOperationException("Not yet implemented");
   }
 
   private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
index 9f3bf885d89..c29ea01b364 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
@@ -33,6 +33,8 @@
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.portable.DirectExecutionContext.DirectStepContext;
@@ -42,21 +44,19 @@
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link DirectGroupAlsoByWindow} {@link PTransform}.
+ * {@code DirectGroupAlsoByWindow} {@link PTransform}.
  */
 class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
@@ -70,12 +70,11 @@
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
+      PTransformNode application,
       CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<InputT> evaluator =
-        createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle);
+        createEvaluator(application, (CommittedBundle) inputBundle);
     return evaluator;
   }
 
@@ -83,18 +82,13 @@
   public void cleanup() {}
 
   private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
-      AppliedPTransform<
-              PCollection<KeyedWorkItem<K, V>>,
-              PCollection<KV<K, Iterable<V>>>,
-              DirectGroupAlsoByWindow<K, V>>
-          application,
-      CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
+      PTransformNode application, CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
     return new GroupAlsoByWindowEvaluator<>(
         evaluationContext, options, inputBundle, application);
   }
 
   /**
-   * A transform evaluator for the pseudo-primitive {@link DirectGroupAlsoByWindow}. The window of
+   * A transform evaluator for the pseudo-primitive {@code DirectGroupAlsoByWindow}. The window of
    * the input {@link KeyedWorkItem} is ignored; it should be in the global window, as element
    * windows are reified in the {@link KeyedWorkItem#elementsIterable()}.
    *
@@ -104,10 +98,7 @@ public void cleanup() {}
       implements TransformEvaluator<KeyedWorkItem<K, V>> {
     private final EvaluationContext evaluationContext;
     private final PipelineOptions options;
-    private final AppliedPTransform<
-        PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
-        DirectGroupAlsoByWindow<K, V>>
-        application;
+    private final PTransformNode application;
 
     private final DirectStepContext stepContext;
     private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow>
@@ -124,11 +115,7 @@ public GroupAlsoByWindowEvaluator(
         final EvaluationContext evaluationContext,
         PipelineOptions options,
         CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
-        final AppliedPTransform<
-                PCollection<KeyedWorkItem<K, V>>,
-                PCollection<KV<K, Iterable<V>>>,
-                DirectGroupAlsoByWindow<K, V>>
-            application) {
+        final PTransformNode application) {
       this.evaluationContext = evaluationContext;
       this.options = options;
       this.application = application;
@@ -138,18 +125,20 @@ public GroupAlsoByWindowEvaluator(
           .getExecutionContext(application, inputBundle.getKey())
           .getStepContext(
               evaluationContext.getStepName(application));
+      // TODO: extract from input PCollection via the graph, including length-prefixing
       windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>)
-              application.getTransform().getInputWindowingStrategy();
+          null;
 
       outputBundles = new ArrayList<>();
       unprocessedElements = ImmutableList.builder();
 
+      // TODO: extract from input PCollection via the graph, including length-prefixing
       Coder<V> valueCoder =
-          application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
+          null;
       reduceFn = SystemReduceFn.buffering(valueCoder);
       droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class,
           GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
+      throw new UnsupportedOperationException("Not yet migrated");
     }
 
     @Override
@@ -157,11 +146,9 @@ public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Ex
       KeyedWorkItem<K, V> workItem = element.getValue();
       K key = workItem.key();
 
+      PCollectionNode outputNode = null;
       UncommittedBundle<KV<K, Iterable<V>>> bundle =
-          evaluationContext.createKeyedBundle(
-              structuralKey,
-              (PCollection<KV<K, Iterable<V>>>)
-                  Iterables.getOnlyElement(application.getOutputs().values()));
+          evaluationContext.createKeyedBundle(structuralKey, outputNode);
       outputBundles.add(bundle);
       CopyOnAccessInMemoryStateInternals stateInternals = stepContext.stateInternals();
       DirectTimerInternals timerInternals = stepContext.timerInternals();
@@ -185,6 +172,7 @@ public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Ex
           dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
       reduceFnRunner.onTimers(workItem.timersIterable());
       reduceFnRunner.persist();
+      throw new UnsupportedOperationException("Not yet migrated");
     }
 
     @Override
@@ -261,7 +249,7 @@ public void outputWindowedValue(
         PaneInfo pane) {
       throw new UnsupportedOperationException(
           String.format(
-              "%s should not use tagged outputs", DirectGroupAlsoByWindow.class.getSimpleName()));
+              "%s should not use tagged outputs", "DirectGroupAlsoByWindow"));
     }
   }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
index fd42d0d5493..ab1538e1c7f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
@@ -19,7 +19,6 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -28,16 +27,15 @@
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.direct.portable.DirectGroupByKey.DirectGroupByKeyOnly;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.portable.StepTransformResult.Builder;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
@@ -52,22 +50,17 @@
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
+      PTransformNode application,
       CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator =
-        createEvaluator(
-            (AppliedPTransform) application);
+    TransformEvaluator<InputT> evaluator = (TransformEvaluator) createEvaluator(application);
     return evaluator;
   }
 
   @Override
   public void cleanup() {}
 
-  private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(
-      final AppliedPTransform<
-              PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKeyOnly<K, V>>
-          application) {
+  private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(final PTransformNode application) {
     return new GroupByKeyOnlyEvaluator<>(evaluationContext, application);
   }
 
@@ -81,26 +74,18 @@ public void cleanup() {}
       implements TransformEvaluator<KV<K, V>> {
     private final EvaluationContext evaluationContext;
 
-    private final AppliedPTransform<
-            PCollection<KV<K, V>>,
-            PCollection<KeyedWorkItem<K, V>>,
-            DirectGroupByKeyOnly<K, V>> application;
+    private final PTransformNode application;
     private final Coder<K> keyCoder;
     private Map<StructuralKey<K>, List<WindowedValue<V>>> groupingMap;
 
     public GroupByKeyOnlyEvaluator(
         EvaluationContext evaluationContext,
-        AppliedPTransform<
-            PCollection<KV<K, V>>,
-            PCollection<KeyedWorkItem<K, V>>,
-            DirectGroupByKeyOnly<K, V>> application) {
+        PTransformNode application) {
       this.evaluationContext = evaluationContext;
       this.application = application;
-      this.keyCoder =
-          getKeyCoder(
-              ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs().values()))
-                  .getCoder());
+      this.keyCoder = null;
       this.groupingMap = new HashMap<>();
+      throw new UnsupportedOperationException("Not yet migrated");
     }
 
     private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
@@ -134,15 +119,13 @@ public void processElement(WindowedValue<KV<K, V>> element) {
         K key = groupedEntry.getKey().getKey();
         KeyedWorkItem<K, V> groupedKv =
             KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
+        PCollectionNode outputNode = null;
         UncommittedBundle<KeyedWorkItem<K, V>> bundle =
-            evaluationContext.createKeyedBundle(
-                StructuralKey.of(key, keyCoder),
-                (PCollection<KeyedWorkItem<K, V>>)
-                    Iterables.getOnlyElement(application.getOutputs().values()));
+            evaluationContext.createKeyedBundle(StructuralKey.of(key, keyCoder), outputNode);
         bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
         resultBuilder.addOutput(bundle);
       }
-      return resultBuilder.build();
+      throw new UnsupportedOperationException("Not yet migrated");
     }
   }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactory.java
index 7b90af47b0a..8f9127b94b5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactory.java
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -49,13 +50,13 @@ private ImmutableListBundleFactory() {}
   }
 
   @Override
-  public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
+  public <T> UncommittedBundle<T> createBundle(PCollectionNode output) {
     return UncommittedImmutableListBundle.create(output, StructuralKey.empty());
   }
 
   @Override
   public <K, T> UncommittedBundle<T> createKeyedBundle(
-      StructuralKey<K> key, PCollection<T> output) {
+      StructuralKey<K> key, PCollectionNode output) {
     return UncommittedImmutableListBundle.create(output, key);
   }
 
@@ -63,7 +64,7 @@ private ImmutableListBundleFactory() {}
    * A {@link UncommittedBundle} that buffers elements in memory.
    */
   private static final class UncommittedImmutableListBundle<T> implements UncommittedBundle<T> {
-    private final PCollection<T> pcollection;
+    private final PCollectionNode pcollection;
     private final StructuralKey<?> key;
     private boolean committed = false;
     private ImmutableList.Builder<WindowedValue<T>> elements;
@@ -73,19 +74,19 @@ private ImmutableListBundleFactory() {}
      * Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}.
      */
     public static <T> UncommittedImmutableListBundle<T> create(
-        PCollection<T> pcollection,
+        PCollectionNode pcollection,
         StructuralKey<?> key) {
       return new UncommittedImmutableListBundle<>(pcollection, key);
     }
 
-    private UncommittedImmutableListBundle(PCollection<T> pcollection, StructuralKey<?> key) {
+    private UncommittedImmutableListBundle(PCollectionNode pcollection, StructuralKey<?> key) {
       this.pcollection = pcollection;
       this.key = key;
       this.elements = ImmutableList.builder();
     }
 
     @Override
-    public PCollection<T> getPCollection() {
+    public PCollectionNode getPCollection() {
       return pcollection;
     }
 
@@ -121,7 +122,7 @@ private UncommittedImmutableListBundle(PCollection<T> pcollection, StructuralKey
   @AutoValue
   abstract static class CommittedImmutableListBundle<T> implements CommittedBundle<T> {
     public static <T> CommittedImmutableListBundle<T> create(
-        @Nullable PCollection<T> pcollection,
+        @Nullable PCollectionNode pcollection,
         StructuralKey<?> key,
         Iterable<WindowedValue<T>> committedElements,
         Instant minElementTimestamp,
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
index 6c4dc73334c..4f00de1a8a4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
@@ -18,17 +18,15 @@
 package org.apache.beam.runners.direct.portable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Collections;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
 
 /** The evaluator for the {@link Impulse} transform. Produces only empty byte arrays. */
 class ImpulseEvaluatorFactory implements TransformEvaluatorFactory {
@@ -41,8 +39,8 @@
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
-    return (TransformEvaluator<InputT>) new ImpulseEvaluator(ctxt, (AppliedPTransform) application);
+      PTransformNode application, CommittedBundle<?> inputBundle) {
+    return (TransformEvaluator<InputT>) new ImpulseEvaluator(ctxt,  application);
   }
 
   @Override
@@ -52,11 +50,10 @@ public void cleanup() {
 
   private static class ImpulseEvaluator implements TransformEvaluator<ImpulseShard> {
     private final EvaluationContext ctxt;
-    private final AppliedPTransform<?, PCollection<byte[]>, Impulse> transform;
+    private final PTransformNode transform;
     private final StepTransformResult.Builder<ImpulseShard> result;
 
-    private ImpulseEvaluator(
-        EvaluationContext ctxt, AppliedPTransform<?, PCollection<byte[]>, Impulse> transform) {
+    private ImpulseEvaluator(EvaluationContext ctxt, PTransformNode transform) {
       this.ctxt = ctxt;
       this.transform = transform;
       this.result = StepTransformResult.withoutHold(transform);
@@ -64,10 +61,10 @@ private ImpulseEvaluator(
 
     @Override
     public void processElement(WindowedValue<ImpulseShard> element) throws Exception {
-      PCollection<byte[]> outputPCollection =
-          (PCollection<byte[]>) Iterables.getOnlyElement(transform.getOutputs().values());
+      PCollectionNode outputPCollection = null;
       result.addOutput(
           ctxt.createBundle(outputPCollection).add(WindowedValue.valueInGlobalWindow(new byte[0])));
+      throw new UnsupportedOperationException("Not yet migrated");
     }
 
     @Override
@@ -80,7 +77,7 @@ public void processElement(WindowedValue<ImpulseShard> element) throws Exception
    * The {@link RootInputProvider} for the {@link Impulse} {@link PTransform}. Produces a single
    * {@link ImpulseShard}.
    */
-  static class ImpulseRootProvider implements RootInputProvider<byte[], ImpulseShard, PBegin> {
+  static class ImpulseRootProvider implements RootInputProvider<ImpulseShard> {
     private final EvaluationContext ctxt;
 
     ImpulseRootProvider(EvaluationContext ctxt) {
@@ -89,9 +86,7 @@ public void processElement(WindowedValue<ImpulseShard> element) throws Exception
 
     @Override
     public Collection<CommittedBundle<ImpulseShard>> getInitialInputs(
-        AppliedPTransform<PBegin, PCollection<byte[]>, PTransform<PBegin, PCollection<byte[]>>>
-            transform,
-        int targetParallelism) {
+        PTransformNode transform, int targetParallelism) {
       return Collections.singleton(
           ctxt.<ImpulseShard>createRootBundle()
               .add(WindowedValue.valueInGlobalWindow(new ImpulseShard()))
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/NanosOffsetClock.java
deleted file mode 100644
index 4f8431e2c01..00000000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/NanosOffsetClock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import java.util.concurrent.TimeUnit;
-import org.joda.time.Instant;
-
-/**
- * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time.
- */
-class NanosOffsetClock implements Clock {
-  private final long baseMillis;
-  private final long nanosAtBaseMillis;
-
-  public static NanosOffsetClock create() {
-    return new NanosOffsetClock();
-  }
-
-  private NanosOffsetClock() {
-    baseMillis = System.currentTimeMillis();
-    nanosAtBaseMillis = System.nanoTime();
-  }
-
-  @Override
-  public Instant now() {
-    return new Instant(
-        baseMillis + (TimeUnit.MILLISECONDS.convert(
-            System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
-  }
-}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PassthroughTransformEvaluator.java
index 8a696d32e00..97dca8d9e4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PassthroughTransformEvaluator.java
@@ -17,20 +17,20 @@
  */
 package org.apache.beam.runners.direct.portable;
 
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.util.WindowedValue;
 
 class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
   public static <InputT> PassthroughTransformEvaluator<InputT> create(
-      AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
+      PTransformNode transform, UncommittedBundle<InputT> output) {
     return new PassthroughTransformEvaluator<>(transform, output);
   }
 
-  private final AppliedPTransform<?, ?, ?> transform;
+  private final PTransformNode transform;
   private final UncommittedBundle<InputT> output;
 
   private PassthroughTransformEvaluator(
-      AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
+      PTransformNode transform, UncommittedBundle<InputT> output) {
     this.transform = transform;
     this.output = output;
   }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
index e394eac2d03..c921fada195 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
@@ -17,17 +17,17 @@
  */
 package org.apache.beam.runners.direct.portable;
 
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.ExecutableGraph;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 
 /**
- * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
- * source and intermediate {@link PTransform PTransforms}.
+ * An executor that schedules and executes {@link PTransformNode PTransformNodes} for both source
+ * and intermediate {@link PTransform PTransforms}.
  */
 interface PipelineExecutor {
   /**
@@ -35,12 +35,12 @@
    * create initial inputs for the provide {@link ExecutableGraph graph}.
    */
   void start(
-      ExecutableGraph<AppliedPTransform<?, ?, ?>, PCollection<?>> graph,
+      ExecutableGraph<PTransformNode, PCollectionNode> graph,
       RootProviderRegistry rootProviderRegistry);
 
   /**
    * Blocks until the job being executed enters a terminal state. A job is completed after all root
-   * {@link AppliedPTransform AppliedPTransforms} have completed, and all {@link CommittedBundle
+   * {@link PTransformNode PTransformNodes} have completed, and all {@link CommittedBundle
    * Bundles} have been consumed. Jobs may also terminate abnormally.
    *
    * <p>Waits for up to the provided duration, or forever if the provided duration is less than or
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PortableGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
similarity index 96%
rename from runners/direct-java/src/main/java/org/apache/beam/runners/direct/PortableGraph.java
rename to runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
index 0e349a2203b..d755702a791 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PortableGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
@@ -16,13 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.direct;
+package org.apache.beam.runners.direct.portable;
 
 import java.util.Collection;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.direct.ExecutableGraph;
 
 /** A {@link ExecutableGraph} for a Portable {@link RunnerApi.Pipeline}. */
 class PortableGraph implements ExecutableGraph<PTransformNode, PCollectionNode> {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
index 965f0f1f2c2..a9cf1471235 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
@@ -20,7 +20,6 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,13 +31,13 @@
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.ExecutableGraph;
-import org.apache.beam.runners.direct.portable.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.local.ExecutionDriver;
 import org.apache.beam.runners.local.PipelineMessageReceiver;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,24 +51,24 @@
 
   public static ExecutionDriver create(
       EvaluationContext context,
-      ExecutableGraph<AppliedPTransform<?, ?, ?>, PCollection<?>> graph,
-      BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>>
+      ExecutableGraph<PTransformNode, PCollectionNode> graph,
+      BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode>
           bundleProcessor,
       PipelineMessageReceiver messageReceiver,
-      Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
+      Map<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
     return new QuiescenceDriver(context, graph, bundleProcessor, messageReceiver, initialBundles);
   }
 
   private final EvaluationContext evaluationContext;
-  private final ExecutableGraph<AppliedPTransform<?, ?, ?>, PCollection<?>> graph;
-  private final BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>>
+  private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
+  private final BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode>
       bundleProcessor;
   private final PipelineMessageReceiver pipelineMessageReceiver;
 
   private final CompletionCallback defaultCompletionCallback =
       new TimerIterableCompletionCallback(Collections.emptyList());
 
-  private final Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
+  private final Map<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>>
       pendingRootBundles;
   private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>();
 
@@ -80,11 +79,11 @@ public static ExecutionDriver create(
 
   private QuiescenceDriver(
       EvaluationContext evaluationContext,
-      ExecutableGraph<AppliedPTransform<?, ?, ?>, PCollection<?>> graph,
-      BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>>
+      ExecutableGraph<PTransformNode, PCollectionNode> graph,
+      BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode>
           bundleProcessor,
       PipelineMessageReceiver pipelineMessageReceiver,
-      Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
+      Map<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>>
           pendingRootBundles) {
     this.evaluationContext = evaluationContext;
     this.graph = graph;
@@ -138,7 +137,7 @@ private void applyUpdate(
       if (ExecutorState.ACTIVE == startingState
           || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
         CommittedBundle<?> bundle = update.getBundle().get();
-        for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
+        for (PTransformNode consumer : update.getConsumers()) {
           outstandingWork.incrementAndGet();
           bundleProcessor.process(bundle, consumer, defaultCompletionCallback);
         }
@@ -154,19 +153,16 @@ private void applyUpdate(
   /** Fires any available timers. */
   private void fireTimers() {
     try {
-      for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
-          evaluationContext.extractFiredTimers()) {
+      for (FiredTimers<PTransformNode> transformTimers : evaluationContext.extractFiredTimers()) {
         Collection<TimerData> delivery = transformTimers.getTimers();
         KeyedWorkItem<?, Object> work =
             KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
+        // TODO: Extract from graph
+        PCollectionNode inputPCollection = null;
         @SuppressWarnings({"unchecked", "rawtypes"})
         CommittedBundle<?> bundle =
             evaluationContext
-                .createKeyedBundle(
-                    transformTimers.getKey(),
-                    (PCollection)
-                        Iterables.getOnlyElement(
-                            transformTimers.getExecutable().getInputs().values()))
+                .createKeyedBundle(transformTimers.getKey(), inputPCollection)
                 .add(WindowedValue.valueInGlobalWindow(work))
                 .commit(evaluationContext.now());
         outstandingWork.incrementAndGet();
@@ -174,6 +170,7 @@ private void fireTimers() {
             bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
         state.set(ExecutorState.ACTIVE);
       }
+      throw new UnsupportedOperationException();
     } catch (Exception e) {
       LOG.error("Internal Error while delivering timers", e);
       pipelineMessageReceiver.failed(e);
@@ -190,7 +187,7 @@ private void addWorkIfNecessary() {
     // If any timers have fired, they will add more work; We don't need to add more
     if (state.get() == ExecutorState.QUIESCENT) {
       // All current TransformExecutors are blocked; add more work from the roots.
-      for (Map.Entry<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
+      for (Map.Entry<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>>
           pendingRootEntry : pendingRootBundles.entrySet()) {
         Collection<CommittedBundle<?>> bundles = new ArrayList<>();
         // Pull all available work off of the queue, then schedule it all, so this loop
@@ -260,7 +257,7 @@ private void addWorkIfNecessary() {
     @Override
     public final CommittedResult handleResult(
         CommittedBundle<?> inputBundle, TransformResult<?> result) {
-      CommittedResult<AppliedPTransform<?, ?, ?>> committedResult =
+      CommittedResult<PTransformNode> committedResult =
           evaluationContext.handleResult(inputBundle, timers, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         pendingWork.offer(
@@ -287,7 +284,7 @@ public final CommittedResult handleResult(
     }
 
     @Override
-    public void handleEmpty(AppliedPTransform<?, ?, ?> transform) {
+    public void handleEmpty(PTransformNode transform) {
       outstandingWork.decrementAndGet();
     }
 
@@ -311,7 +308,7 @@ public void handleError(Error err) {
   @AutoValue
   abstract static class WorkUpdate {
     private static WorkUpdate fromBundle(
-        CommittedBundle<?> bundle, Collection<AppliedPTransform<?, ?, ?>> consumers) {
+        CommittedBundle<?> bundle, Collection<PTransformNode> consumers) {
       return new AutoValue_QuiescenceDriver_WorkUpdate(
           Optional.of(bundle), consumers, Optional.absent());
     }
@@ -328,7 +325,7 @@ private static WorkUpdate fromException(Exception e) {
      * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return a
      * present {@link Optional}.
      */
-    public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
+    public abstract Collection<PTransformNode> getConsumers();
 
     public abstract Optional<? extends Exception> getException();
   }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootInputProvider.java
index e0e24fcdf9d..9839672293c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootInputProvider.java
@@ -19,32 +19,27 @@
 package org.apache.beam.runners.direct.portable;
 
 import java.util.Collection;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 
 /**
  * Provides {@link CommittedBundle bundles} that will be provided to the {@link PTransform
  * PTransforms} that are at the root of a {@link Pipeline}.
  */
-interface RootInputProvider<T, ShardT, InputT extends PInput> {
+interface RootInputProvider<ShardT> {
   /**
-   * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be
+   * Get the initial inputs for the {@link PTransformNode}. The {@link PTransformNode} will be
    * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs.
    *
    * <p>For source transforms, these should be sufficient that, when provided to the evaluators
-   * produced by {@link TransformEvaluatorFactory#forApplication(AppliedPTransform,
-   * CommittedBundle)}, all of the elements contained in the source are eventually produced.
+   * produced by {@link TransformEvaluatorFactory#forApplication(PTransformNode, CommittedBundle)},
+   * all of the elements contained in the source are eventually produced.
    *
-   * @param transform the {@link AppliedPTransform} to get initial inputs for.
+   * @param transform the {@link PTransformNode} to get initial inputs for.
    * @param targetParallelism the target amount of parallelism to obtain from the source. Must be
    *     greater than or equal to 1.
    */
   Collection<CommittedBundle<ShardT>> getInitialInputs(
-      AppliedPTransform<InputT, PCollection<T>, PTransform<InputT, PCollection<T>>>
-          transform,
-      int targetParallelism)
-      throws Exception;
+      PTransformNode transform, int targetParallelism) throws Exception;
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
index 3ef64649740..2834db79a0f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
@@ -24,7 +24,7 @@
 import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.PTransform;
 
@@ -33,26 +33,23 @@
  * based on the type of {@link PTransform} of the application.
  */
 class RootProviderRegistry {
-  /**
-   * Returns a {@link RootProviderRegistry} that only supports the {@link Impulse} primitive.
-   */
+  /** Returns a {@link RootProviderRegistry} that only supports the {@link Impulse} primitive. */
   public static RootProviderRegistry impulseRegistry(EvaluationContext context) {
     return new RootProviderRegistry(
-        ImmutableMap.<String, RootInputProvider<?, ?, ?>>builder()
+        ImmutableMap.<String, RootInputProvider<?>>builder()
             .put(IMPULSE_TRANSFORM_URN, new ImpulseEvaluatorFactory.ImpulseRootProvider(context))
             .build());
   }
 
-  private final Map<String, RootInputProvider<?, ?, ?>> providers;
+  private final Map<String, RootInputProvider<?>> providers;
 
-  private RootProviderRegistry(
-      Map<String, RootInputProvider<?, ?, ?>> providers) {
+  private RootProviderRegistry(Map<String, RootInputProvider<?>> providers) {
     this.providers = providers;
   }
 
   public Collection<CommittedBundle<?>> getInitialInputs(
-      AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws Exception {
-    String transformUrn = PTransformTranslation.urnForTransform(transform.getTransform());
+      PTransformNode transform, int targetParallelism) throws Exception {
+    String transformUrn = PTransformTranslation.urnForTransformOrNull(transform.getTransform());
     RootInputProvider provider =
         checkNotNull(
             providers.get(transformUrn),
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepAndKey.java
index 989ad4ae5c0..639e8acb2c9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepAndKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepAndKey.java
@@ -19,25 +19,25 @@
 
 import com.google.common.base.MoreObjects;
 import java.util.Objects;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 
 /**
  * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
  * per-step in a keyed manner (e.g. State).
  */
 final class StepAndKey {
-  private final AppliedPTransform<?, ?, ?> step;
+  private final PTransformNode step;
   private final StructuralKey<?> key;
 
   /**
    * Create a new {@link StepAndKey} with the provided step and key.
    */
-  public static StepAndKey of(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
+  public static StepAndKey of(PTransformNode step, StructuralKey<?> key) {
     return new StepAndKey(step, key);
   }
 
-  private StepAndKey(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
+  private StepAndKey(PTransformNode step, StructuralKey<?> key) {
     this.step = step;
     this.key = key;
   }
@@ -45,7 +45,7 @@ private StepAndKey(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(StepAndKey.class)
-        .add("step", step.getFullName())
+        .add("step", step.getId())
         .add("key", key.getKey())
         .toString();
   }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepTransformResult.java
index cb37106bdaf..8c9233c86f4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepTransformResult.java
@@ -23,10 +23,10 @@
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.portable.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
@@ -37,13 +37,11 @@
 @AutoValue
 abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
 
-  public static <InputT> Builder<InputT> withHold(
-      AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+  public static <InputT> Builder<InputT> withHold(PTransformNode transform, Instant watermarkHold) {
     return new Builder(transform, watermarkHold);
   }
 
-  public static <InputT> Builder<InputT> withoutHold(
-      AppliedPTransform<?, ?, ?> transform) {
+  public static <InputT> Builder<InputT> withoutHold(PTransformNode transform) {
     return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
@@ -64,7 +62,7 @@
    * A builder for creating instances of {@link StepTransformResult}.
    */
   public static class Builder<InputT> {
-    private final AppliedPTransform<?, ?, ?> transform;
+    private final PTransformNode transform;
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
     private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
     private MetricUpdates metricUpdates;
@@ -73,7 +71,7 @@
     private final Set<OutputType> producedOutputs;
     private final Instant watermarkHold;
 
-    private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+    private Builder(PTransformNode transform, Instant watermarkHold) {
       this.transform = transform;
       this.watermarkHold = watermarkHold;
       this.bundlesBuilder = ImmutableList.builder();
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorFactory.java
index 472d9ce4fd4..8565a241a4f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorFactory.java
@@ -18,9 +18,9 @@
 package org.apache.beam.runners.direct.portable;
 
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -47,13 +47,13 @@
    */
   @Nullable
   <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
+      PTransformNode application, CommittedBundle<?> inputBundle)
       throws Exception;
 
   /**
    * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a
    * {@link Pipeline} is shut down. No more calls to
-   * {@link #forApplication(AppliedPTransform, CommittedBundle)} will be made after
+   * {@link #forApplication(PTransformNode, CommittedBundle)} will be made after
    * a call to {@link #cleanup()}.
    */
   void cleanup() throws Exception;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
index a3ce579ac6f..ce9693ba673 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
@@ -25,7 +25,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +49,11 @@ private TransformEvaluatorRegistry(
   }
 
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+      PTransformNode application, CommittedBundle<?> inputBundle) throws Exception {
     checkState(
         !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry");
 
-    String urn = PTransformTranslation.urnForTransform(application.getTransform());
+    String urn = PTransformTranslation.urnForTransformOrNull(application.getTransform());
 
     TransformEvaluatorFactory factory =
         checkNotNull(factories.get(urn), "No evaluator for PTransform \"%s\"", urn);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorFactory.java
index b55fac9024f..f41afeb457f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformExecutorFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.direct.portable;
 
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
 /**
  * A Factory for creating {@link TransformExecutor Transform Executors} on an input.
@@ -26,7 +26,7 @@
 interface TransformExecutorFactory {
   TransformExecutor create(
       CommittedBundle<?> bundle,
-      AppliedPTransform<?, ?, ?> transform,
+      PTransformNode transform,
       CompletionCallback onComplete,
       TransformExecutorService executorService);
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformResult.java
index e2ced072e1f..fa8e95c4c82 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformResult.java
@@ -19,10 +19,10 @@
 
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.portable.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -30,19 +30,19 @@
 import org.joda.time.Instant;
 
 /**
- * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
+ * The result of evaluating an {@link PTransformNode} with a {@link TransformEvaluator}.
  *
  * <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs
  * so there is not necesssarily a defined output type.
  */
 interface TransformResult<InputT> {
   /**
-   * Returns the {@link AppliedPTransform} that produced this result.
+   * Returns the {@link PTransformNode} that produced this result.
    *
    * <p>This is treated as an opaque identifier so evaluators can delegate to other evaluators
    * that may not have compatible types.
    */
-  AppliedPTransform<?, ?, ?> getTransform();
+  PTransformNode getTransform();
 
   /**
    * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/UncommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/UncommittedBundle.java
index b3d190109b8..7eb50ec058a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/UncommittedBundle.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/UncommittedBundle.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.direct.portable;
 
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -36,7 +37,7 @@
    * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
    */
   @Nullable
-  PCollection<T> getPCollection();
+  PCollectionNode getPCollection();
 
   /**
    * Outputs an element to this bundle.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutor.java
index f77c1054e5d..cf78d16b58c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutor.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
@@ -33,16 +33,16 @@
  * Executes callbacks that occur based on the progression of the watermark per-step.
  *
  * <p>Callbacks are registered by calls to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
- * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
- * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
+ * {@link #callOnGuaranteedFiring(PTransformNode, BoundedWindow, WindowingStrategy, Runnable)},
+ * and are executed after a call to {@link #fireForWatermark(PTransformNode, Instant)} with the
+ * same {@link PTransformNode} and a watermark sufficient to ensure that the trigger for the
  * windowing strategy would have been produced.
  *
  * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
- * {@link AppliedPTransform} - any call to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
+ * {@link PTransformNode} - any call to
+ * {@link #callOnGuaranteedFiring(PTransformNode, BoundedWindow, WindowingStrategy, Runnable)}
  * that could have potentially already fired should be followed by a call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
+ * {@link #fireForWatermark(PTransformNode, Instant)} for the same transform with the current
  * value of the watermark.
  */
 class WatermarkCallbackExecutor {
@@ -53,7 +53,7 @@ public static WatermarkCallbackExecutor create(Executor executor) {
     return new WatermarkCallbackExecutor(executor);
   }
 
-  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
+  private final ConcurrentMap<PTransformNode, PriorityQueue<WatermarkCallback>>
       callbacks;
   private final Executor executor;
 
@@ -64,11 +64,11 @@ private WatermarkCallbackExecutor(Executor executor) {
 
   /**
    * Execute the provided {@link Runnable} after the next call to
-   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
+   * {@link #fireForWatermark(PTransformNode, Instant)} where the window is guaranteed to have
    * produced output.
    */
   public void callOnGuaranteedFiring(
-      AppliedPTransform<?, ?, ?> step,
+      PTransformNode step,
       BoundedWindow window,
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
@@ -90,11 +90,11 @@ public void callOnGuaranteedFiring(
 
   /**
    * Execute the provided {@link Runnable} after the next call to
-   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window
+   * {@link #fireForWatermark(PTransformNode, Instant)} where the window
    * is guaranteed to be expired.
    */
   public void callOnWindowExpiration(
-      AppliedPTransform<?, ?, ?> step,
+      PTransformNode step,
       BoundedWindow window,
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
@@ -118,7 +118,7 @@ public void callOnWindowExpiration(
    * Schedule all pending callbacks that must have produced output by the time of the provided
    * watermark.
    */
-  public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
+  public void fireForWatermark(PTransformNode step, Instant watermark) {
     PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
     if (callbackQueue == null) {
       return;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkManager.java
deleted file mode 100644
index fce2a42b76b..00000000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WatermarkManager.java
+++ /dev/null
@@ -1,1568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.Table;
-import com.google.common.collect.TreeMultiset;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.ExecutableGraph;
-import org.apache.beam.runners.local.Bundle;
-import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of
- * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for
- * in-memory execution. {@link WatermarkManager} is designed to update and return a
- * consistent view of watermarks in the presence of concurrent updates.
- *
- * <p>An {@link WatermarkManager} is provided with the collection of root
- * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to
- * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time.
- *
- * <p>Whenever a root {@link AppliedPTransform executable} produces elements, the
- * {@link WatermarkManager} is provided with the produced elements and the output watermark
- * of the producing {@link AppliedPTransform executable}. The
- * {@link WatermarkManager watermark manager} is responsible for computing the watermarks
- * of all {@link AppliedPTransform transforms} that consume one or more
- * {@link PCollection PCollections}.
- *
- * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight
- * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs
- * atomically:
- * <ul>
- *  <li>All of the in-flight elements are removed from the collection of pending elements for the
- *      {@link AppliedPTransform}.</li>
- *  <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection
- *      of pending elements for each {@link AppliedPTransform} that consumes them.</li>
- *  <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of
- *    <ul>
- *      <li>the previous input watermark</li>
- *      <li>the minimum of
- *        <ul>
- *          <li>the timestamps of all currently pending elements</li>
- *          <li>all input {@link PCollection} watermarks</li>
- *        </ul>
- *      </li>
- *    </ul>
- *  </li>
- *  <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of
- *    <ul>
- *      <li>the previous output watermark</li>
- *      <li>the minimum of
- *        <ul>
- *          <li>the current input watermark</li>
- *          <li>the current watermark holds</li>
- *        </ul>
- *      </li>
- *    </ul>
- *  </li>
- *  <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of
- *      the {@link AppliedPTransform}</li>
- *  <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be
- *      advanced.</li>
- * </ul>
- *
- * <p>The watermark of a {@link PCollection} is equal to the output watermark of the
- * {@link AppliedPTransform} that produces it.
- *
- * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre>
- * Watermark_In'  = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
- * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold)))
- * Watermark_PCollection = Watermark_Out_ProducingPTransform
- * </pre>
- */
-class WatermarkManager<ExecutableT, CollectionT> {
-  // The number of updates to apply in #tryApplyPendingUpdates
-  private static final int MAX_INCREMENTAL_UPDATES = 10;
-
-
-  /**
-   * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
-   * {@link PCollection}.
-   *
-   * <p>A watermark is a monotonically increasing value, which represents the point up to which the
-   * system believes it has received all of the data. Data that arrives with a timestamp that is
-   * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special
-   * timestamp which indicates we have received all of the data and there will be no more on-time or
-   * late data. This value is represented by {@link WatermarkManager#THE_END_OF_TIME}.
-   */
-  @VisibleForTesting interface Watermark {
-    /**
-     * Returns the current value of this watermark.
-     */
-    Instant get();
-
-    /**
-     * Refreshes the value of this watermark from its input watermarks and watermark holds.
-     *
-     * @return true if the value of the watermark has changed (and thus dependent watermark must
-     *         also be updated
-     */
-    WatermarkUpdate refresh();
-  }
-
-  /**
-   * The result of computing a {@link Watermark}.
-   */
-  private enum WatermarkUpdate {
-    /** The watermark is later than the value at the previous time it was computed. */
-    ADVANCED(true),
-    /** The watermark is equal to the value at the previous time it was computed. */
-    NO_CHANGE(false);
-
-    private final boolean advanced;
-
-    private WatermarkUpdate(boolean advanced) {
-      this.advanced = advanced;
-    }
-
-    public boolean isAdvanced() {
-      return advanced;
-    }
-
-    /**
-     * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates.
-     *
-     * <p>If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result
-     * {@link WatermarkUpdate} has been advanced.
-     */
-    public WatermarkUpdate union(WatermarkUpdate that) {
-      if (this.advanced) {
-        return this;
-      }
-      return that;
-    }
-
-    /**
-     * Returns the {@link WatermarkUpdate} based on the former and current
-     * {@link Instant timestamps}.
-     */
-    public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
-      if (currentTime.isAfter(oldTime)) {
-        return ADVANCED;
-      }
-      return NO_CHANGE;
-    }
-  }
-
-  /**
-   * The input {@link Watermark} of an {@link AppliedPTransform}.
-   *
-   * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the
-   * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum
-   * timestamp of all of the pending elements, restricted to be monotonically increasing.
-   *
-   * <p>See {@link #refresh()} for more information.
-   */
-  @VisibleForTesting static class AppliedPTransformInputWatermark implements Watermark {
-    private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<Bundle<?, ?>> pendingElements;
-
-    // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
-    // minimum
-    private final SortedMultiset<TimerData> pendingTimers;
-
-    // Entries in this table represent the authoritative timestamp for which
-    // a per-key-and-StateNamespace timer is set.
-    private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
-
-    // This per-key sorted set allows quick retrieval of timers that should fire for a key
-    private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
-
-    private AtomicReference<Instant> currentWatermark;
-
-    public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
-      this.inputWatermarks = inputWatermarks;
-      // The ordering must order elements by timestamp, and must not compare two distinct elements
-      // as equal. This is built on the assumption that any element added as a pending element will
-      // be consumed without modifications.
-      //
-      // The same logic is applied for pending timers
-      Ordering<Bundle<?, ?>> pendingBundleComparator =
-          new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
-      this.pendingElements =
-          TreeMultiset.create(pendingBundleComparator);
-      this.pendingTimers = TreeMultiset.create();
-      this.objectTimers = new HashMap<>();
-      this.existingTimers = new HashMap<>();
-      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    @Override
-    public Instant get() {
-      return currentWatermark.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes
-     * equal to the maximum value of
-     * <ul>
-     *   <li>the previous input watermark</li>
-     *   <li>the minimum of
-     *     <ul>
-     *       <li>the timestamps of all currently pending elements</li>
-     *       <li>all input {@link PCollection} watermarks</li>
-     *     </ul>
-     *   </li>
-     * </ul>
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldWatermark = currentWatermark.get();
-      Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (Watermark inputWatermark : inputWatermarks) {
-        minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
-      }
-      if (!pendingElements.isEmpty()) {
-        minInputWatermark =
-            INSTANT_ORDERING.min(
-                minInputWatermark, pendingElements.firstEntry().getElement().getMinimumTimestamp());
-      }
-      Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
-      currentWatermark.set(newWatermark);
-      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
-    }
-
-    private synchronized void addPending(Bundle<?, ?> newPending) {
-      pendingElements.add(newPending);
-    }
-
-    private synchronized void removePending(Bundle<?, ?> completed) {
-      pendingElements.remove(completed);
-    }
-
-    @VisibleForTesting synchronized Instant getEarliestTimerTimestamp() {
-      if (pendingTimers.isEmpty()) {
-        return BoundedWindow.TIMESTAMP_MAX_VALUE;
-      } else {
-        return pendingTimers.firstEntry().getElement().getTimestamp();
-      }
-    }
-
-    @VisibleForTesting synchronized void updateTimers(TimerUpdate update) {
-      NavigableSet<TimerData> keyTimers =
-          objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
-      Table<StateNamespace, String, TimerData> existingTimersForKey =
-              existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create());
-
-      for (TimerData timer : update.getSetTimers()) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          @Nullable
-          TimerData existingTimer =
-              existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
-
-          if (existingTimer == null) {
-            pendingTimers.add(timer);
-            keyTimers.add(timer);
-          } else if (!existingTimer.equals(timer)) {
-            pendingTimers.remove(existingTimer);
-            keyTimers.remove(existingTimer);
-            pendingTimers.add(timer);
-            keyTimers.add(timer);
-          } // else the timer is already set identically, so noop
-
-          existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
-        }
-      }
-
-      for (TimerData timer : update.getDeletedTimers()) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          @Nullable
-          TimerData existingTimer =
-              existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
-
-          if (existingTimer != null) {
-            pendingTimers.remove(existingTimer);
-            keyTimers.remove(existingTimer);
-            existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
-          }
-        }
-      }
-
-      for (TimerData timer : update.getCompletedTimers()) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.remove(timer);
-          pendingTimers.remove(timer);
-        }
-      }
-    }
-
-    @VisibleForTesting
-    synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
-      return extractFiredTimers(currentWatermark.get(), objectTimers);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class)
-          .add("pendingElements", pendingElements)
-          .add("currentWatermark", currentWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * The output {@link Watermark} of an {@link AppliedPTransform}.
-   *
-   * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the
-   * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same
-   * {@link AppliedPTransform}, restricted to be monotonically increasing. See
-   * {@link #refresh()} for more information.
-   */
-  private static class AppliedPTransformOutputWatermark implements Watermark {
-    private final AppliedPTransformInputWatermark inputWatermark;
-    private final PerKeyHolds holds;
-    private AtomicReference<Instant> currentWatermark;
-
-    public AppliedPTransformOutputWatermark(
-        AppliedPTransformInputWatermark inputWatermark) {
-      this.inputWatermark = inputWatermark;
-      holds = new PerKeyHolds();
-      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    public synchronized void updateHold(Object key, Instant newHold) {
-      if (newHold == null) {
-        holds.removeHold(key);
-      } else {
-        holds.updateHold(key, newHold);
-      }
-    }
-
-    @Override
-    public Instant get() {
-      return currentWatermark.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes
-     * equal to the maximum value of:
-     * <ul>
-     *   <li>the previous output watermark</li>
-     *   <li>the minimum of
-     *     <ul>
-     *       <li>the current input watermark</li>
-     *       <li>the current watermark holds</li>
-     *     </ul>
-     *   </li>
-     * </ul>
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldWatermark = currentWatermark.get();
-      Instant newWatermark = INSTANT_ORDERING.min(
-          inputWatermark.get(),
-          inputWatermark.getEarliestTimerTimestamp(),
-          holds.getMinHold());
-      newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
-      currentWatermark.set(newWatermark);
-      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class)
-          .add("holds", holds)
-          .add("currentWatermark", currentWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
-   * {@link AppliedPTransform}.
-   *
-   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal
-   * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream
-   * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input
-   * synchronized processing time at any step is equal to the maximum of:
-   * <ul>
-   *   <li>The most recently returned synchronized processing input time
-   *   <li>The minimum of
-   *     <ul>
-   *       <li>The current processing time
-   *       <li>The current synchronized processing time input hold
-   *     </ul>
-   * </ul>
-   */
-  private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
-    private final Collection<? extends Watermark> inputWms;
-    private final Collection<Bundle<?, ?>> pendingBundles;
-    private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
-    private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
-    private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
-
-    private final NavigableSet<TimerData> pendingTimers;
-
-    private AtomicReference<Instant> earliestHold;
-
-    public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
-      this.inputWms = inputWms;
-      this.pendingBundles = new HashSet<>();
-      this.processingTimers = new HashMap<>();
-      this.synchronizedProcessingTimers = new HashMap<>();
-      this.existingTimers = new HashMap<>();
-      this.pendingTimers = new TreeSet<>();
-      Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (Watermark wm : inputWms) {
-        initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
-      }
-      earliestHold = new AtomicReference<>(initialHold);
-    }
-
-    @Override
-    public Instant get() {
-      return earliestHold.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark}
-     * becomes equal to the minimum value of
-     * <ul>
-     *   <li>the timestamps of all currently pending bundles</li>
-     *   <li>all input {@link PCollection} synchronized processing time watermarks</li>
-     * </ul>
-     *
-     * <p>Note that this value is not monotonic, but the returned value for the synchronized
-     * processing time must be.
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldHold = earliestHold.get();
-      Instant minTime = THE_END_OF_TIME.get();
-      for (Watermark input : inputWms) {
-        minTime = INSTANT_ORDERING.min(minTime, input.get());
-      }
-      for (Bundle<?, ?> bundle : pendingBundles) {
-        // TODO: Track elements in the bundle by the processing time they were output instead of
-        // entire bundles. Requried to support arbitrarily splitting and merging bundles between
-        // steps
-        minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark());
-      }
-      earliestHold.set(minTime);
-      return WatermarkUpdate.fromTimestamps(oldHold, minTime);
-    }
-
-    public synchronized void addPending(Bundle<?, ?> bundle) {
-      pendingBundles.add(bundle);
-    }
-
-    public synchronized void removePending(Bundle<?, ?> bundle) {
-      pendingBundles.remove(bundle);
-    }
-
-    /**
-     * Return the earliest timestamp of the earliest timer that has not been completed. This is
-     * either the earliest timestamp across timers that have not been completed, or the earliest
-     * timestamp across timers that have been delivered but have not been completed.
-     */
-    public synchronized Instant getEarliestTimerTimestamp() {
-      Instant earliest = THE_END_OF_TIME.get();
-      for (NavigableSet<TimerData> timers : processingTimers.values()) {
-        if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
-        }
-      }
-      for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
-        if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
-        }
-      }
-      if (!pendingTimers.isEmpty()) {
-        earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest);
-      }
-      return earliest;
-    }
-
-    private synchronized void updateTimers(TimerUpdate update) {
-      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
-      Table<StateNamespace, String, TimerData> existingTimersForKey =
-          existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create());
-
-      for (TimerData addedTimer : update.setTimers) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
-        if (timerQueue == null) {
-          continue;
-        }
-
-        @Nullable
-        TimerData existingTimer =
-            existingTimersForKey.get(addedTimer.getNamespace(), addedTimer.getTimerId());
-        if (existingTimer == null) {
-          timerQueue.add(addedTimer);
-        } else if (!existingTimer.equals(addedTimer)) {
-          timerQueue.remove(existingTimer);
-          timerQueue.add(addedTimer);
-        } // else the timer is already set identically, so noop.
-
-        existingTimersForKey.put(addedTimer.getNamespace(), addedTimer.getTimerId(), addedTimer);
-      }
-
-      for (TimerData deletedTimer : update.deletedTimers) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
-        if (timerQueue == null) {
-          continue;
-        }
-
-        @Nullable
-        TimerData existingTimer =
-            existingTimersForKey.get(deletedTimer.getNamespace(), deletedTimer.getTimerId());
-
-        if (existingTimer != null) {
-          pendingTimers.remove(deletedTimer);
-          timerQueue.remove(deletedTimer);
-          existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
-        }
-      }
-
-      for (TimerData completedTimer : update.completedTimers) {
-        pendingTimers.remove(completedTimer);
-      }
-    }
-
-    private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
-        TimeDomain domain, Instant firingTime) {
-      Map<StructuralKey<?>, List<TimerData>> firedTimers;
-      switch (domain) {
-        case PROCESSING_TIME:
-          firedTimers = extractFiredTimers(firingTime, processingTimers);
-          break;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          firedTimers =
-              extractFiredTimers(
-                  INSTANT_ORDERING.min(firingTime, earliestHold.get()),
-                  synchronizedProcessingTimers);
-          break;
-        default:
-          throw new IllegalArgumentException(
-              "Called getFiredTimers on a Synchronized Processing Time watermark"
-                  + " and gave a non-processing time domain "
-                  + domain);
-      }
-      for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
-          firedTimers.entrySet()) {
-        pendingTimers.addAll(firedTimer.getValue());
-      }
-      return firedTimers;
-    }
-
-    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) {
-      NavigableSet<TimerData> processingQueue =
-          processingTimers.computeIfAbsent(key, k -> new TreeSet<>());
-      NavigableSet<TimerData> synchronizedProcessingQueue =
-              synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>());
-      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class);
-      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
-      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
-      return result;
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class)
-          .add("earliestHold", earliestHold)
-          .toString();
-    }
-  }
-
-  /**
-   * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
-   * {@link AppliedPTransform}.
-   *
-   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is
-   * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all
-   * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output
-   * synchronized processing time at any step is equal to the maximum of:
-   * <ul>
-   *   <li>The most recently returned synchronized processing output time
-   *   <li>The minimum of
-   *     <ul>
-   *       <li>The current processing time
-   *       <li>The current synchronized processing time output hold
-   *     </ul>
-   * </ul>
-   */
-  private static class SynchronizedProcessingTimeOutputWatermark implements Watermark {
-    private final SynchronizedProcessingTimeInputWatermark inputWm;
-    private AtomicReference<Instant> latestRefresh;
-
-    public SynchronizedProcessingTimeOutputWatermark(
-        SynchronizedProcessingTimeInputWatermark inputWm) {
-      this.inputWm = inputWm;
-      this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    @Override
-    public Instant get() {
-      return latestRefresh.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark}
-     * becomes equal to the minimum value of:
-     * <ul>
-     *   <li>the current input watermark.
-     *   <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input
-     *       watermark.
-     *   <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark.
-     * </ul>
-     *
-     * <p>Note that this value is not monotonic, but the returned value for the synchronized
-     * processing time must be.
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      // Hold the output synchronized processing time to the input watermark, which takes into
-      // account buffered bundles, and the earliest pending timer, which determines what to hold
-      // downstream timers to.
-      Instant oldRefresh = latestRefresh.get();
-      Instant newTimestamp =
-          INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
-      latestRefresh.set(newTimestamp);
-      return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
-          .add("latestRefresh", latestRefresh)
-          .toString();
-    }
-  }
-
-  /**
-   * The {@code Watermark} that is after the latest time it is possible to represent in the global
-   * window. This is a distinguished value representing a complete {@link PTransform}.
-   */
-  private static final Watermark THE_END_OF_TIME = new Watermark() {
-        @Override
-        public WatermarkUpdate refresh() {
-          // THE_END_OF_TIME is a distinguished value that cannot be advanced.
-          return WatermarkUpdate.NO_CHANGE;
-        }
-
-        @Override
-        public Instant get() {
-          return BoundedWindow.TIMESTAMP_MAX_VALUE;
-        }
-      };
-
-  private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
-
-  /**
-   * For each (Object, NavigableSet) pair in the provided map, remove each Timer that is before the
-   * latestTime argument and put in in the result with the same key, then remove all of the keys
-   * which have no more pending timers.
-   *
-   * <p>The result collection retains ordering of timers (from earliest to latest).
-   */
-  private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
-      Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) {
-    Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
-    Set<StructuralKey<?>> emptyKeys = new HashSet<>();
-    for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers :
-        objectTimers.entrySet()) {
-      NavigableSet<TimerData> timers = pendingTimers.getValue();
-      if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
-        ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
-        result.put(pendingTimers.getKey(), keyFiredTimers);
-        while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
-          keyFiredTimers.add(timers.first());
-          timers.remove(timers.first());
-        }
-      }
-      if (timers.isEmpty()) {
-        emptyKeys.add(pendingTimers.getKey());
-      }
-    }
-    objectTimers.keySet().removeAll(emptyKeys);
-    return result;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain.
-   */
-  private final Clock clock;
-
-  /**
-   * The {@link DirectGraph} representing the {@link Pipeline} this {@link WatermarkManager} tracks
-   * watermarks for.
-   */
-  private final ExecutableGraph<ExecutableT, CollectionT> graph;
-
-  /**
-   * The input and output watermark of each {@link AppliedPTransform}.
-   */
-  private final Map<ExecutableT, TransformWatermarks> transformToWatermarks;
-
-  /**
-   * A queue of pending updates to the state of this {@link WatermarkManager}.
-   */
-  private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
-
-  /**
-   * A lock used to control concurrency for updating pending values.
-   */
-  private final Lock refreshLock;
-
-  /**
-   * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially
-   * stale data.
-   */
-  @GuardedBy("refreshLock")
-  private final Set<ExecutableT> pendingRefreshes;
-
-  /**
-   * Creates a new {@link WatermarkManager}. All watermarks within the newly created {@link
-   * WatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the minimum watermark,
-   * with no watermark holds or pending elements.
-   *
-   * @param clock the clock to use to determine processing time
-   * @param graph the graph representing this pipeline
-   */
-  public static WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> create(
-      Clock clock, ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph) {
-    return new WatermarkManager<>(clock, graph);
-  }
-
-  private WatermarkManager(Clock clock, ExecutableGraph<ExecutableT, CollectionT> graph) {
-    this.clock = clock;
-    this.graph = graph;
-
-    this.pendingUpdates = new ConcurrentLinkedQueue<>();
-
-    this.refreshLock = new ReentrantLock();
-    this.pendingRefreshes = new HashSet<>();
-
-    transformToWatermarks = new HashMap<>();
-
-    for (ExecutableT rootTransform : graph.getRootTransforms()) {
-      getTransformWatermark(rootTransform);
-    }
-    for (ExecutableT primitiveTransform : graph.getExecutables()) {
-      getTransformWatermark(primitiveTransform);
-    }
-  }
-
-  private TransformWatermarks getValueWatermark(CollectionT value) {
-    return getTransformWatermark(graph.getProducer(value));
-  }
-
-  private TransformWatermarks getTransformWatermark(ExecutableT executable) {
-    TransformWatermarks wms = transformToWatermarks.get(executable);
-    if (wms == null) {
-      List<Watermark> inputCollectionWatermarks = getInputWatermarks(executable);
-      AppliedPTransformInputWatermark inputWatermark =
-          new AppliedPTransformInputWatermark(inputCollectionWatermarks);
-      AppliedPTransformOutputWatermark outputWatermark =
-          new AppliedPTransformOutputWatermark(inputWatermark);
-
-      SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
-          new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(executable));
-      SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
-          new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
-
-      wms =
-          new TransformWatermarks(
-              executable,
-              inputWatermark,
-              outputWatermark,
-              inputProcessingWatermark,
-              outputProcessingWatermark);
-      transformToWatermarks.put(executable, wms);
-    }
-    return wms;
-  }
-
-  private Collection<Watermark> getInputProcessingWatermarks(ExecutableT executable) {
-    ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    Collection<CollectionT> inputs = graph.getPerElementInputs(executable);
-    if (inputs.isEmpty()) {
-      inputWmsBuilder.add(THE_END_OF_TIME);
-    }
-    for (CollectionT input : inputs) {
-      Watermark producerOutputWatermark =
-          getValueWatermark(input).synchronizedProcessingOutputWatermark;
-      inputWmsBuilder.add(producerOutputWatermark);
-    }
-    return inputWmsBuilder.build();
-  }
-
-  private List<Watermark> getInputWatermarks(ExecutableT executable) {
-    ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    Collection<CollectionT> inputs = graph.getPerElementInputs(executable);
-    if (inputs.isEmpty()) {
-      inputWatermarksBuilder.add(THE_END_OF_TIME);
-    }
-    for (CollectionT input : inputs) {
-      Watermark producerOutputWatermark = getValueWatermark(input).outputWatermark;
-      inputWatermarksBuilder.add(producerOutputWatermark);
-    }
-    return inputWatermarksBuilder.build();
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Gets the input and output watermarks for an {@link AppliedPTransform}. If the {@link
-   * AppliedPTransform PTransform} has not processed any elements, return a watermark of {@link
-   * BoundedWindow#TIMESTAMP_MIN_VALUE}.
-   *
-   * @return a snapshot of the input watermark and output watermark for the provided executable
-   */
-  public TransformWatermarks getWatermarks(ExecutableT executable) {
-    return transformToWatermarks.get(executable);
-  }
-
-  public void initialize(
-      Map<ExecutableT, ? extends Iterable<Bundle<?,  CollectionT>>> initialBundles) {
-    refreshLock.lock();
-    try {
-      for (Map.Entry<ExecutableT, ? extends Iterable<Bundle<?, CollectionT>>> rootEntry :
-          initialBundles.entrySet()) {
-        TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
-        for (Bundle<?, ? extends CollectionT> initialBundle : rootEntry.getValue()) {
-          rootWms.addPending(initialBundle);
-        }
-        pendingRefreshes.add(rootEntry.getKey());
-      }
-    } finally {
-      refreshLock.unlock();
-    }
-  }
-
-  /**
-   * Updates the watermarks of a executable with one or more inputs.
-   *
-   * <p>Each executable has two monotonically increasing watermarks: the input watermark, which can,
-   * at any time, be updated to equal:
-   * <pre>
-   * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks))
-   * </pre>
-   * and the output watermark, which can, at any time, be updated to equal:
-   * <pre>
-   * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
-   * </pre>.
-   *
-   * <p>Updates to watermarks may not be immediately visible.
-   *
-   * @param completed the input that has completed
-   * @param timerUpdate the timers that were added, removed, and completed as part of producing this
-   *     update
-   * @param executable the executable applied to {@code completed} to produce the outputs
-   * @param unprocessedInputs inputs that could not be processed
-   * @param outputs outputs that were produced by the application of the {@code executable} to the
-   *     input
-   * @param earliestHold the earliest watermark hold in the executable's state.
-   */
-  public void updateWatermarks(
-      @Nullable Bundle<?, ? extends CollectionT> completed,
-      TimerUpdate timerUpdate,
-      ExecutableT executable,
-      @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
-      Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
-      Instant earliestHold) {
-    pendingUpdates.offer(
-        PendingWatermarkUpdate.create(
-            executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold));
-    tryApplyPendingUpdates();
-  }
-
-  private void tryApplyPendingUpdates() {
-    if (refreshLock.tryLock()) {
-      try {
-        applyNUpdates(MAX_INCREMENTAL_UPDATES);
-      } finally {
-        refreshLock.unlock();
-      }
-    }
-  }
-
-  /**
-   * Applies all pending updates to this {@link WatermarkManager}, causing the pending state
-   * of all {@link TransformWatermarks} to be advanced as far as possible.
-   */
-  private void applyAllPendingUpdates() {
-    refreshLock.lock();
-    try {
-      applyNUpdates(-1);
-    } finally {
-      refreshLock.unlock();
-    }
-  }
-
-  /**
-   * Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive.
-   */
-  @GuardedBy("refreshLock")
-  private void applyNUpdates(int numUpdates) {
-    for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) {
-      PendingWatermarkUpdate<ExecutableT, CollectionT> pending = pendingUpdates.poll();
-      applyPendingUpdate(pending);
-      pendingRefreshes.add(pending.getExecutable());
-    }
-  }
-
-  /** Apply a {@link PendingWatermarkUpdate} to the {@link WatermarkManager}. */
-  private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT, CollectionT> pending) {
-    ExecutableT executable = pending.getExecutable();
-    Bundle<?, ? extends CollectionT> inputBundle = pending.getInputBundle();
-
-    updatePending(
-        inputBundle,
-        pending.getTimerUpdate(),
-        executable,
-        pending.getUnprocessedInputs(),
-        pending.getOutputs());
-
-    TransformWatermarks transformWms = transformToWatermarks.get(executable);
-    transformWms.setEventTimeHold(
-        inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
-  }
-
-  /**
-   * First adds all produced elements to the queue of pending elements for each consumer, then adds
-   * all pending timers to the collection of pending timers, then removes all completed and deleted
-   * timers from the collection of pending timers, then removes all completed elements from the
-   * pending queue of the executable.
-   *
-   * <p>It is required that all newly pending elements are added to the queue of pending elements
-   * for each consumer prior to the completed elements being removed, as doing otherwise could cause
-   * a Watermark to appear in a state in which the upstream (completed) element does not hold the
-   * watermark but the element it produced is not yet pending. This can cause the watermark to
-   * erroneously advance.
-   *
-   * <p>See {@link #updateWatermarks(Bundle, TimerUpdate, Object, Bundle,
-   * Iterable, Instant)} for information about the parameters of this method.
-   */
-  private void updatePending(
-      Bundle<?, ? extends CollectionT> input,
-      TimerUpdate timerUpdate,
-      ExecutableT executable,
-      @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
-      Iterable<? extends Bundle<?, ? extends CollectionT>> outputs) {
-    // Newly pending elements must be added before completed elements are removed, as the two
-    // do not share a Mutex within this call and thus can be interleaved with external calls to
-    // refresh.
-    for (Bundle<?, ? extends CollectionT> bundle : outputs) {
-      for (ExecutableT consumer :
-          // TODO: Remove this cast once CommittedBundle returns a CollectionT
-          graph.getPerElementConsumers((CollectionT) bundle.getPCollection())) {
-        TransformWatermarks watermarks = transformToWatermarks.get(consumer);
-        watermarks.addPending(bundle);
-      }
-    }
-
-    TransformWatermarks completedTransform = transformToWatermarks.get(executable);
-    if (unprocessedInputs != null) {
-      // Add the unprocessed inputs
-      completedTransform.addPending(unprocessedInputs);
-    }
-    completedTransform.updateTimers(timerUpdate);
-    if (input != null) {
-      completedTransform.removePending(input);
-    }
-  }
-
-  /**
-   * Refresh the watermarks contained within this {@link WatermarkManager}, causing all
-   * watermarks to be advanced as far as possible.
-   */
-  synchronized void refreshAll() {
-    refreshLock.lock();
-    try {
-      applyAllPendingUpdates();
-      Set<ExecutableT> toRefresh = pendingRefreshes;
-      while (!toRefresh.isEmpty()) {
-        toRefresh = refreshAllOf(toRefresh);
-      }
-    } finally {
-      refreshLock.unlock();
-    }
-  }
-
-  private Set<ExecutableT> refreshAllOf(Set<ExecutableT> toRefresh) {
-    Set<ExecutableT> newRefreshes = new HashSet<>();
-    for (ExecutableT executable : toRefresh) {
-      newRefreshes.addAll(refreshWatermarks(executable));
-    }
-    return newRefreshes;
-  }
-
-  private Set<ExecutableT> refreshWatermarks(ExecutableT toRefresh) {
-    TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
-    WatermarkUpdate updateResult = myWatermarks.refresh();
-    if (updateResult.isAdvanced()) {
-      Set<ExecutableT> additionalRefreshes = new HashSet<>();
-      for (CollectionT outputPValue : graph.getProduced(toRefresh)) {
-        additionalRefreshes.addAll(graph.getPerElementConsumers(outputPValue));
-      }
-      return additionalRefreshes;
-    }
-    return Collections.emptySet();
-  }
-
-  /**
-   * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
-   * pending timers will be removed from this {@link WatermarkManager}.
-   */
-  public Collection<FiredTimers<ExecutableT>> extractFiredTimers() {
-    Collection<FiredTimers<ExecutableT>> allTimers = new ArrayList<>();
-    refreshLock.lock();
-    try {
-      for (Map.Entry<ExecutableT, TransformWatermarks> watermarksEntry
-          : transformToWatermarks.entrySet()) {
-        Collection<FiredTimers<ExecutableT>> firedTimers =
-            watermarksEntry.getValue().extractFiredTimers();
-        allTimers.addAll(firedTimers);
-      }
-      return allTimers;
-    } finally {
-      refreshLock.unlock();
-    }
-  }
-
-  /**
-   * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
-   * and as such the watermark manager must track holds and the release of holds on a per-key basis.
-   *
-   * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals,
-   * as the key is arbitrarily ordered via identity, rather than object equality.
-   */
-  private static final class KeyedHold implements Comparable<KeyedHold> {
-    private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
-
-    private final Object key;
-    private final Instant timestamp;
-
-    /**
-     * Create a new KeyedHold with the specified key and timestamp.
-     */
-    public static KeyedHold of(Object key, Instant timestamp) {
-      return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
-    }
-
-    private KeyedHold(Object key, Instant timestamp) {
-      this.key = key;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public int compareTo(KeyedHold that) {
-      return ComparisonChain.start()
-          .compare(this.timestamp, that.timestamp)
-          .compare(this.key, that.key, KEY_ORDERING)
-          .result();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(timestamp, key);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof KeyedHold)) {
-        return false;
-      }
-      KeyedHold that = (KeyedHold) other;
-      return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
-    }
-
-    /**
-     * Get the value of this {@link KeyedHold}.
-     */
-    public Instant getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(KeyedHold.class)
-          .add("key", key)
-          .add("hold", timestamp)
-          .toString();
-    }
-  }
-
-  private static class PerKeyHolds {
-    private final Map<Object, KeyedHold> keyedHolds;
-    private final NavigableSet<KeyedHold> allHolds;
-
-    private PerKeyHolds() {
-      this.keyedHolds = new HashMap<>();
-      this.allHolds = new TreeSet<>();
-    }
-
-    /**
-     * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if
-     * there are no holds within this {@link PerKeyHolds}.
-     */
-    public Instant getMinHold() {
-      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.first().getTimestamp();
-    }
-
-    /**
-     * Updates the hold of the provided key to the provided value, removing any other holds for
-     * the same key.
-     */
-    public void updateHold(@Nullable Object key, Instant newHold) {
-      removeHold(key);
-      KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
-      keyedHolds.put(key, newKeyedHold);
-      allHolds.add(newKeyedHold);
-    }
-
-    /**
-     * Removes the hold of the provided key.
-     */
-    public void removeHold(Object key) {
-      KeyedHold oldHold = keyedHolds.remove(key);
-      if (oldHold != null) {
-        allHolds.remove(oldHold);
-      }
-    }
-  }
-
-  /**
-   * A reference to the input and output watermarks of an {@link AppliedPTransform}.
-   */
-  public class TransformWatermarks {
-    private final ExecutableT executable;
-
-    private final AppliedPTransformInputWatermark inputWatermark;
-    private final AppliedPTransformOutputWatermark outputWatermark;
-
-    private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
-    private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
-
-    private Instant latestSynchronizedInputWm;
-    private Instant latestSynchronizedOutputWm;
-
-    private TransformWatermarks(
-        ExecutableT executable,
-        AppliedPTransformInputWatermark inputWatermark,
-        AppliedPTransformOutputWatermark outputWatermark,
-        SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
-        SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
-      this.executable = executable;
-      this.inputWatermark = inputWatermark;
-      this.outputWatermark = outputWatermark;
-
-      this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
-      this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
-      this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    /**
-     * Returns the input watermark of the {@link AppliedPTransform}.
-     */
-    public Instant getInputWatermark() {
-      return checkNotNull(inputWatermark.get());
-    }
-
-    /**
-     * Returns the output watermark of the {@link AppliedPTransform}.
-     */
-    public Instant getOutputWatermark() {
-      return outputWatermark.get();
-    }
-
-    /**
-     * Returns the synchronized processing input time of the {@link AppliedPTransform}.
-     *
-     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
-     * presence of holds, will increase as the system time progresses.
-     */
-    public synchronized Instant getSynchronizedProcessingInputTime() {
-      latestSynchronizedInputWm = INSTANT_ORDERING.max(
-          latestSynchronizedInputWm,
-          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get()));
-      return latestSynchronizedInputWm;
-    }
-
-    /**
-     * Returns the synchronized processing output time of the {@link AppliedPTransform}.
-     *
-     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
-     * presence of holds, will increase as the system time progresses.
-     */
-    public synchronized Instant getSynchronizedProcessingOutputTime() {
-      latestSynchronizedOutputWm = INSTANT_ORDERING.max(
-          latestSynchronizedOutputWm,
-          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get()));
-      return latestSynchronizedOutputWm;
-    }
-
-    private WatermarkUpdate refresh() {
-      inputWatermark.refresh();
-      synchronizedProcessingInputWatermark.refresh();
-      WatermarkUpdate eventOutputUpdate = outputWatermark.refresh();
-      WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh();
-      return eventOutputUpdate.union(syncOutputUpdate);
-    }
-
-    private void setEventTimeHold(Object key, Instant newHold) {
-      outputWatermark.updateHold(key, newHold);
-    }
-
-    private void removePending(Bundle<?, ?> bundle) {
-      inputWatermark.removePending(bundle);
-      synchronizedProcessingInputWatermark.removePending(bundle);
-    }
-
-    private void addPending(Bundle<?, ?> bundle) {
-      inputWatermark.addPending(bundle);
-      synchronizedProcessingInputWatermark.addPending(bundle);
-    }
-
-    private Collection<FiredTimers<ExecutableT>> extractFiredTimers() {
-      Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
-          inputWatermark.extractFiredEventTimeTimers();
-      Map<StructuralKey<?>, List<TimerData>> processingTimers;
-      Map<StructuralKey<?>, List<TimerData>> synchronizedTimers;
-      processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-          TimeDomain.PROCESSING_TIME, clock.now());
-      synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-          TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
-
-      Map<StructuralKey<?>, List<TimerData>> timersPerKey =
-          groupFiredTimers(eventTimeTimers, processingTimers, synchronizedTimers);
-      Collection<FiredTimers<ExecutableT>> keyFiredTimers = new ArrayList<>(timersPerKey.size());
-      for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers :
-          timersPerKey.entrySet()) {
-        keyFiredTimers.add(
-            new FiredTimers<>(executable, firedTimers.getKey(), firedTimers.getValue()));
-      }
-      return keyFiredTimers;
-    }
-
-    @SafeVarargs
-    private final Map<StructuralKey<?>, List<TimerData>> groupFiredTimers(
-        Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
-      Map<StructuralKey<?>, List<TimerData>> groupedTimers = new HashMap<>();
-      for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
-        for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) {
-          List<TimerData> grouped =
-              groupedTimers.computeIfAbsent(newTimers.getKey(), k -> new ArrayList<>());
-          grouped.addAll(newTimers.getValue());
-        }
-      }
-      return groupedTimers;
-    }
-
-    private void updateTimers(TimerUpdate update) {
-      inputWatermark.updateTimers(update);
-      synchronizedProcessingInputWatermark.updateTimers(update);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(TransformWatermarks.class)
-          .add("inputWatermark", inputWatermark)
-          .add("outputWatermark", outputWatermark)
-          .add("inputProcessingTime", synchronizedProcessingInputWatermark)
-          .add("outputProcessingTime", synchronizedProcessingOutputWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * A collection of newly set, deleted, and completed timers.
-   *
-   * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the
-   * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as
-   * the input to the executed step.
-   */
-  public static class TimerUpdate {
-    private final StructuralKey<?> key;
-    private final Iterable<? extends TimerData> completedTimers;
-
-    private final Iterable<? extends TimerData> setTimers;
-    private final Iterable<? extends TimerData> deletedTimers;
-
-    /**
-     * Returns a TimerUpdate for a null key with no timers.
-     */
-    public static TimerUpdate empty() {
-      return new TimerUpdate(
-          null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
-    }
-
-    /**
-     * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
-     * set and deleted timers to be added to it.
-     */
-    public static TimerUpdateBuilder builder(StructuralKey<?> key) {
-      return new TimerUpdateBuilder(key);
-    }
-
-    /**
-     * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
-     */
-    public static final class TimerUpdateBuilder {
-      private final StructuralKey<?> key;
-      private final Collection<TimerData> completedTimers;
-      private final Collection<TimerData> setTimers;
-      private final Collection<TimerData> deletedTimers;
-
-      private TimerUpdateBuilder(StructuralKey<?> key) {
-        this.key = key;
-        this.completedTimers = new LinkedHashSet<>();
-        this.setTimers = new LinkedHashSet<>();
-        this.deletedTimers = new LinkedHashSet<>();
-      }
-
-      /**
-       * Adds all of the provided timers to the collection of completed timers, and returns this
-       * {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) {
-        Iterables.addAll(this.completedTimers, completedTimers);
-        return this;
-      }
-
-      /**
-       * Adds the provided timer to the collection of set timers, removing it from deleted timers if
-       * it has previously been deleted. Returns this {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder setTimer(TimerData setTimer) {
-        checkArgument(
-            setTimer.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
-            "Got a timer for after the end of time (%s), got %s",
-            BoundedWindow.TIMESTAMP_MAX_VALUE,
-            setTimer.getTimestamp());
-        deletedTimers.remove(setTimer);
-        setTimers.add(setTimer);
-        return this;
-      }
-
-      /**
-       * Adds the provided timer to the collection of deleted timers, removing it from set timers if
-       * it has previously been set. Returns this {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
-        deletedTimers.add(deletedTimer);
-        setTimers.remove(deletedTimer);
-        return this;
-      }
-
-      /**
-       * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers,
-       * and deletedTimers.
-       */
-      public TimerUpdate build() {
-        return new TimerUpdate(
-            key,
-            ImmutableList.copyOf(completedTimers),
-            ImmutableList.copyOf(setTimers),
-            ImmutableList.copyOf(deletedTimers));
-      }
-    }
-
-    private TimerUpdate(
-        StructuralKey<?> key,
-        Iterable<? extends TimerData> completedTimers,
-        Iterable<? extends TimerData> setTimers,
-        Iterable<? extends TimerData> deletedTimers) {
-      this.key = key;
-      this.completedTimers = completedTimers;
-      this.setTimers = setTimers;
-      this.deletedTimers = deletedTimers;
-    }
-
-    @VisibleForTesting
-    StructuralKey<?> getKey() {
-      return key;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getCompletedTimers() {
-      return completedTimers;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getSetTimers() {
-      return setTimers;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getDeletedTimers() {
-      return deletedTimers;
-    }
-
-    /**
-     * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
-     */
-    public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
-      return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(key, completedTimers, setTimers, deletedTimers);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof TimerUpdate)) {
-        return false;
-      }
-      TimerUpdate that = (TimerUpdate) other;
-      return Objects.equals(this.key, that.key)
-          && Objects.equals(this.completedTimers, that.completedTimers)
-          && Objects.equals(this.setTimers, that.setTimers)
-          && Objects.equals(this.deletedTimers, that.deletedTimers);
-    }
-  }
-
-  /**
-   * A pair of {@link TimerData} and key which can be delivered to the appropriate
-   * {@link AppliedPTransform}. A timer fires at the executable that set it with a specific key when
-   * the time domain in which it lives progresses past a specified time, as determined by the
-   * {@link WatermarkManager}.
-   */
-  public static class FiredTimers<ExecutableT> {
-    /** The executable the timers were set at and will be delivered to. */
-    private final ExecutableT executable;
-    /** The key the timers were set for and will be delivered to. */
-    private final StructuralKey<?> key;
-    private final Collection<TimerData> timers;
-
-    private FiredTimers(
-        ExecutableT executable, StructuralKey<?> key, Collection<TimerData> timers) {
-      this.executable = executable;
-      this.key = key;
-      this.timers = timers;
-    }
-
-    public ExecutableT getExecutable() {
-      return executable;
-    }
-
-    public StructuralKey<?> getKey() {
-      return key;
-    }
-
-    /**
-     * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers
-     * fired within the provided domain, return an empty collection.
-     *
-     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp.
-     */
-    public Collection<TimerData> getTimers() {
-      return timers;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString();
-    }
-  }
-
-  private static class BundleByElementTimestampComparator extends Ordering<Bundle<?, ?>>
-      implements Serializable {
-    @Override
-    public int compare(Bundle<?, ?> o1, Bundle<?, ?> o2) {
-      return ComparisonChain.start()
-          .compare(o1.getMinimumTimestamp(), o2.getMinimumTimestamp())
-          .result();
-    }
-  }
-
-  @AutoValue
-  abstract static class PendingWatermarkUpdate<ExecutableT, CollectionT> {
-    abstract ExecutableT getExecutable();
-
-    @Nullable
-    abstract Bundle<?, ? extends CollectionT> getInputBundle();
-
-    abstract TimerUpdate getTimerUpdate();
-
-    @Nullable
-    abstract Bundle<?, ? extends CollectionT> getUnprocessedInputs();
-
-    abstract Iterable<? extends Bundle<?, ? extends CollectionT>> getOutputs();
-
-    abstract Instant getEarliestHold();
-
-    public static <ExecutableT, CollectionT>
-        PendingWatermarkUpdate<ExecutableT, CollectionT> create(
-            ExecutableT executable,
-            @Nullable Bundle<?, ? extends CollectionT> inputBundle,
-            TimerUpdate timerUpdate,
-            @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
-            Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
-            Instant earliestHold) {
-      return new AutoValue_WatermarkManager_PendingWatermarkUpdate<>(
-          executable, inputBundle, timerUpdate, unprocessedInputs, outputs, earliestHold);
-    }
-  }
-}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactory.java
index 44e9c6ac38b..e4aa4a34da4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactory.java
@@ -20,14 +20,13 @@
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.WindowIntoTranslation;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
 
 /**
@@ -43,41 +42,29 @@
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle
- )
-      throws Exception {
-    return createTransformEvaluator((AppliedPTransform) application);
+      PTransformNode application, @Nullable CommittedBundle<?> inputBundle)  {
+    return createTransformEvaluator(application);
   }
 
-  private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
-          transform) {
+  private <InputT> TransformEvaluator<InputT> createTransformEvaluator(PTransformNode transform) {
+    WindowFn<? super InputT, ?> fn = null;
 
-    WindowFn<? super InputT, ?> fn = (WindowFn) WindowIntoTranslation.getWindowFn(transform);
-
-    UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(
-            (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values()));
-    if (fn == null) {
-      return PassthroughTransformEvaluator.create(transform, outputBundle);
-    }
-    return new WindowIntoEvaluator<>(transform, fn, outputBundle);
+    PCollectionNode outputPCollection = null;
+    evaluationContext.createBundle(outputPCollection);
+    throw new UnsupportedOperationException("Not yet migrated");
   }
 
   @Override
   public void cleanup() {}
 
   private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
-    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
-        transform;
+    private final PTransformNode transform;
     private final WindowFn<InputT, ?> windowFn;
     private final UncommittedBundle<InputT> outputBundle;
 
     @SuppressWarnings("unchecked")
     public WindowIntoEvaluator(
-        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
-            transform,
+        PTransformNode transform,
         WindowFn<? super InputT, ?> windowFn,
         UncommittedBundle<InputT> outputBundle) {
       this.outputBundle = outputBundle;
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
index 029deb47235..2493727538d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
@@ -23,6 +23,7 @@
 
 import com.google.common.collect.ImmutableSet;
 import java.util.Set;
+import org.apache.beam.runners.direct.portable.ExecutableGraphBuilder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -63,6 +64,9 @@ public void testDirectRunnerApiSurface() throws Exception {
             .pruningClass(DisplayData.Builder.class)
             .pruningClass(MetricResults.class)
             .pruningClass(DirectGraphs.class)
+            .pruningClass(
+                WatermarkManager.class /* TODO: BEAM-4237 Consider moving to local-java */)
+            .pruningClass(ExecutableGraphBuilder.class)
             .pruningPattern("org[.]apache[.]beam[.].*Test.*")
             .pruningPattern("org[.]apache[.]beam[.].*IT")
             .pruningPattern("java[.]io.*")
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/CommittedResultTest.java
index 0358296f442..9c93eaea7f7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/CommittedResultTest.java
@@ -26,17 +26,14 @@
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded.Enum;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.portable.CommittedResult.OutputType;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -51,41 +48,33 @@
   @Rule
   public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
-  private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
-  private transient AppliedPTransform<?, ?, ?> transform =
-      AppliedPTransform.<PBegin, PDone, PTransform<PBegin, PDone>>of(
-          "foo",
-          p.begin().expand(),
-          PDone.in(p).expand(),
-          new PTransform<PBegin, PDone>() {
-            @Override
-            public PDone expand(PBegin begin) {
-              throw new IllegalArgumentException("Should never be applied");
-            }
-          },
-          p);
+  private transient PCollectionNode created =
+      PipelineNode.pCollection(
+          "created", RunnerApi.PCollection.newBuilder().setUniqueName("created").build());
+  private transient PTransformNode transform =
+      PipelineNode.pTransform("foo", RunnerApi.PTransform.getDefaultInstance());
   private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   @Test
   public void getTransformExtractsFromResult() {
-    CommittedResult<AppliedPTransform<?, ?, ?>> result =
+    CommittedResult<PTransformNode> result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             Optional.absent(),
             Collections.emptyList(),
             EnumSet.noneOf(OutputType.class));
 
-    assertThat(result.getExecutable(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
+    assertThat(result.getExecutable(), Matchers.equalTo(transform));
   }
 
   @Test
   public void getUncommittedElementsEqualInput() {
     CommittedBundle<Integer> bundle =
         bundleFactory
-            .createBundle(created)
+            .<Integer>createBundle(created)
             .add(WindowedValue.valueInGlobalWindow(2))
             .commit(Instant.now());
-    CommittedResult<AppliedPTransform<?, ?, ?>> result =
+    CommittedResult<PTransformNode> result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             Optional.of(bundle),
@@ -97,7 +86,7 @@ public void getUncommittedElementsEqualInput() {
 
   @Test
   public void getUncommittedElementsNull() {
-    CommittedResult<AppliedPTransform<?, ?, ?>> result =
+    CommittedResult<PTransformNode> result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             Optional.absent(),
@@ -112,22 +101,24 @@ public void getOutputsEqualInput() {
     List<? extends CommittedBundle<Integer>> outputs =
         ImmutableList.of(
             bundleFactory
-                .createBundle(
-                    PCollection.createPrimitiveOutputInternal(
-                        p,
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.BOUNDED,
-                        VarIntCoder.of()))
+                .<Integer>createBundle(
+                    PipelineNode.pCollection(
+                        "bounded",
+                        RunnerApi.PCollection.newBuilder()
+                            .setUniqueName("bounded")
+                            .setIsBounded(Enum.BOUNDED)
+                            .build()))
                 .commit(Instant.now()),
             bundleFactory
-                .createBundle(
-                    PCollection.createPrimitiveOutputInternal(
-                        p,
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED,
-                        VarIntCoder.of()))
+                .<Integer>createBundle(
+                    PipelineNode.pCollection(
+                        "unbounded",
+                        RunnerApi.PCollection.newBuilder()
+                            .setUniqueName("unbounded")
+                            .setIsBounded(Enum.UNBOUNDED)
+                            .build()))
                 .commit(Instant.now()));
-    CommittedResult<AppliedPTransform<?, ?, ?>> result =
+    CommittedResult<PTransformNode> result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             Optional.absent(),
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTimerInternalsTest.java
index 18eafccd82e..f5c73fdbf71 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTimerInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTimerInternalsTest.java
@@ -24,9 +24,9 @@
 
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.TimeDomain;
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
index 3dbac5c4c52..4eb02657fce 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.java
@@ -34,16 +34,15 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.runners.direct.ExecutableGraph;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.portable.CommittedResult.OutputType;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -59,10 +58,18 @@
 @RunWith(JUnit4.class)
 public class DirectTransformExecutorTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
-  private PCollection<String> created;
-
-  private AppliedPTransform<?, ?, ?> createdProducer;
-  private AppliedPTransform<?, ?, ?> downstreamProducer;
+  private final PCollectionNode created =
+      PipelineNode.pCollection(
+          "created", PCollection.newBuilder().setUniqueName("created").build());
+
+  private final PTransformNode createdProducer =
+      PipelineNode.pTransform(
+          "create",
+          PTransform.newBuilder().putOutputs("created", "created").setUniqueName("create").build());
+  private final PTransformNode downstreamProducer =
+      PipelineNode.pTransform(
+          "downstream",
+          PTransform.newBuilder().putInputs("input", "created").setUniqueName("create").build());
 
   private CountDownLatch evaluatorCompleted;
 
@@ -87,14 +94,8 @@ public void setup() {
     evaluatorCompleted = new CountDownLatch(1);
     completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
 
-    created = p.apply(Create.of("foo", "spam", "third"));
-    PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.of(3));
-
-    DirectGraphs.performDirectOverrides(p);
-    ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph =
-        DirectGraphs.getGraph(p);
-    createdProducer = graph.getProducer(created);
-    downstreamProducer = graph.getProducer(downstream);
+    PipelineNode.pCollection(
+        "created", RunnerApi.PCollection.newBuilder().setUniqueName("created").build());
 
     when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
@@ -175,7 +176,12 @@ public void processElement(WindowedValue<String> element) throws Exception {
     WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
     WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
     CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
+        bundleFactory
+            .<String>createBundle(created)
+            .add(foo)
+            .add(spam)
+            .add(third)
+            .commit(Instant.now());
     when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     DirectTransformExecutor<String> executor =
@@ -216,7 +222,7 @@ public void processElement(WindowedValue<String> element) throws Exception {
 
     WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
     CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(created).add(foo).commit(Instant.now());
+        bundleFactory.<String>createBundle(created).add(foo).commit(Instant.now());
     when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     DirectTransformExecutor<String> executor =
@@ -249,7 +255,8 @@ public void processElement(WindowedValue<String> element) throws Exception {}
           }
         };
 
-    CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.<String>createBundle(created).commit(Instant.now());
     when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     DirectTransformExecutor<String> executor =
@@ -300,7 +307,7 @@ public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformRes
     }
 
     @Override
-    public void handleEmpty(AppliedPTransform<?, ?, ?> transform) {
+    public void handleEmpty(PTransformNode transform) {
       handledEmpty = true;
       onMethod.countDown();
     }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
index 315e3226fcd..9809c481dba 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
@@ -34,34 +34,26 @@
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.DirectGraphs;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.ExecutableGraph;
+import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.portable.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.portable.WatermarkManager.FiredTimers;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -71,36 +63,32 @@
 public class EvaluationContextTest {
   private EvaluationContext context;
 
-  private PCollection<Integer> created;
-  private PCollection<KV<String, Integer>> downstream;
-  private PCollectionView<Iterable<Integer>> view;
-  private PCollection<Long> unbounded;
+  private PCollectionNode created;
+  private PCollectionNode downstream;
 
-  private ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph;
+  private ExecutableGraph<PTransformNode, PCollectionNode> graph;
 
-  private AppliedPTransform<?, ?, ?> createdProducer;
-  private AppliedPTransform<?, ?, ?> downstreamProducer;
-  private AppliedPTransform<?, ?, ?> unboundedProducer;
-
-  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  private PTransformNode createdProducer;
+  private PTransformNode downstreamProducer;
+  private PTransformNode unboundedProducer;
 
   @Before
   public void setup() {
-    created = p.apply(Create.of(1, 2, 3));
-    downstream = created.apply(WithKeys.of("foo"));
-    view = created.apply(View.asIterable());
-    unbounded = p.apply(GenerateSequence.from(0));
+    ExecutableGraphBuilder graphBuilder =
+        ExecutableGraphBuilder.create()
+            .addTransform("create", null, "created")
+            .addTransform("downstream", "created", "downstream.out")
+            .addTransform("unbounded", null, "unbounded.out");
+
+    graph = graphBuilder.toGraph();
+    created = graphBuilder.collectionNode("created");
+    downstream = graphBuilder.collectionNode("downstream.out");
+    createdProducer = graphBuilder.transformNode("create");
+    downstreamProducer = graphBuilder.transformNode("downstream");
+    unboundedProducer = graphBuilder.transformNode("unbounded");
 
     BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-    DirectGraphs.performDirectOverrides(p);
-    graph = DirectGraphs.getGraph(p);
-    context =
-        EvaluationContext.create(
-            NanosOffsetClock.create(), bundleFactory, graph, ImmutableSet.of());
-
-    createdProducer = graph.getProducer(created);
-    downstreamProducer = graph.getProducer(downstream);
-    unboundedProducer = graph.getProducer(unbounded);
+    context = EvaluationContext.create(Instant::new, bundleFactory, graph, ImmutableSet.of());
   }
 
   @Test
@@ -268,10 +256,10 @@ public void extractFiredTimersExtractsTimers() {
     // Should cause the downstream timer to fire
     context.handleResult(null, ImmutableList.of(), advanceResult);
 
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> fired = context.extractFiredTimers();
+    Collection<FiredTimers<PTransformNode>> fired = context.extractFiredTimers();
     assertThat(Iterables.getOnlyElement(fired).getKey(), Matchers.equalTo(key));
 
-    FiredTimers<AppliedPTransform<?, ?, ?>> firedForKey = Iterables.getOnlyElement(fired);
+    FiredTimers<PTransformNode> firedForKey = Iterables.getOnlyElement(fired);
     // Contains exclusively the fired timer
     assertThat(firedForKey.getTimers(), contains(toFire));
 
@@ -283,7 +271,9 @@ public void extractFiredTimersExtractsTimers() {
   public void createKeyedBundleKeyed() {
     StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
     CommittedBundle<KV<String, Integer>> keyedBundle =
-        context.createKeyedBundle(key, downstream).commit(Instant.now());
+        context
+            .<String, KV<String, Integer>>createKeyedBundle(key, downstream)
+            .commit(Instant.now());
     assertThat(keyedBundle.getKey(), Matchers.equalTo(key));
   }
 
@@ -317,7 +307,7 @@ public void isDoneWithPartiallyDone() {
         null, ImmutableList.of(), StepTransformResult.withoutHold(unboundedProducer).build());
     assertThat(context.isDone(), is(false));
 
-    for (AppliedPTransform<?, ?, ?> consumers : graph.getPerElementConsumers(created)) {
+    for (PTransformNode consumers : graph.getPerElementConsumers(created)) {
       context.handleResult(
           committedBundle, ImmutableList.of(), StepTransformResult.withoutHold(consumers).build());
     }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ExecutableGraphBuilder.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ExecutableGraphBuilder.java
new file mode 100644
index 00000000000..b5f6c19300f
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ExecutableGraphBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.direct.ExecutableGraph;
+
+/**
+ * A builder of simple {@link ExecutableGraph ExecutableGraphs} suitable for use in the portable
+ * direct runner, to reduce verbosity of creating a graph with no payloads of any meaning.
+ */
+public class ExecutableGraphBuilder {
+  private final RunnerApi.Components.Builder components;
+
+  private ExecutableGraphBuilder() {
+    components = Components.newBuilder();
+  }
+
+  public static ExecutableGraphBuilder create() {
+    return new ExecutableGraphBuilder();
+  }
+
+  public ExecutableGraphBuilder addTransform(
+      String name, @Nullable String input, String... outputs) {
+    PTransform.Builder pt = PTransform.newBuilder().setUniqueName(name);
+    if (input != null) {
+      pt = pt.putInputs("input", input);
+      addPCollection(input);
+    }
+    for (String output : outputs) {
+      pt = pt.putOutputs(output, output);
+      addPCollection(output);
+    }
+    components.putTransforms(name, pt.build());
+    return this;
+  }
+
+  private ExecutableGraphBuilder addPCollection(String name) {
+    components.putPcollections(name, PCollection.newBuilder().setUniqueName(name).build());
+    return this;
+  }
+
+  public PTransformNode transformNode(String name) {
+    return PipelineNode.pTransform(name, components.getTransformsOrThrow(name));
+  }
+
+  public PCollectionNode collectionNode(String name) {
+    return PipelineNode.pCollection(name, components.getPcollectionsOrThrow(name));
+  }
+
+  public ExecutableGraph<PTransformNode, PCollectionNode> toGraph() {
+    return PortableGraph.forPipeline(
+        Pipeline.newBuilder()
+            .setComponents(components)
+            .addAllRootTransformIds(components.getTransformsMap().keySet())
+            .build());
+  }
+}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
index 816254b056e..f3177cdf8ab 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
@@ -18,25 +18,23 @@
 package org.apache.beam.runners.direct.portable;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Iterables;
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,30 +47,44 @@
 
   @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
+  @Ignore("TODO: BEAM-4240 Enable when the Flatten Evaluator Factory is fully migrated")
   @Test
   public void testFlattenInMemoryEvaluator() throws Exception {
-    PCollection<Integer> left = p.apply("left", Create.of(1, 2, 4));
-    PCollection<Integer> right = p.apply("right", Create.of(-1, 2, -4));
-    PCollectionList<Integer> list = PCollectionList.of(left).and(right);
-
-    PCollection<Integer> flattened = list.apply(Flatten.pCollections());
-
-    CommittedBundle<Integer> leftBundle = bundleFactory.createBundle(left).commit(Instant.now());
-    CommittedBundle<Integer> rightBundle = bundleFactory.createBundle(right).commit(Instant.now());
+    PCollectionNode left =
+        PipelineNode.pCollection("left", PCollection.newBuilder().setUniqueName("left").build());
+    PCollectionNode right =
+        PipelineNode.pCollection("right", PCollection.newBuilder().setUniqueName("right").build());
+
+    PTransformNode flatten =
+        PipelineNode.pTransform(
+            "flatten",
+            PTransform.newBuilder()
+                .setUniqueName("flatten")
+                .setSpec(
+                    FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN))
+                .build());
+
+    PCollectionNode flattened =
+        PipelineNode.pCollection("flat", PCollection.newBuilder().setUniqueName("flat").build());
+
+    CommittedBundle<Integer> leftBundle =
+        bundleFactory.<Integer>createBundle(left).commit(Instant.now());
+    CommittedBundle<Integer> rightBundle =
+        bundleFactory.<Integer>createBundle(right).commit(Instant.now());
 
     EvaluationContext context = mock(EvaluationContext.class);
 
     UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createBundle(flattened);
     UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createBundle(flattened);
 
-    when(context.createBundle(flattened)).thenReturn(flattenedLeftBundle, flattenedRightBundle);
+    when(context.<Integer>createBundle(flattened))
+        .thenReturn(flattenedLeftBundle, flattenedRightBundle);
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context);
-    AppliedPTransform<?, ?, ?> flattenedProducer = DirectGraphs.getProducer(flattened);
     TransformEvaluator<Integer> leftSideEvaluator =
-        factory.forApplication(flattenedProducer, leftBundle);
+        factory.forApplication(flatten, leftBundle);
     TransformEvaluator<Integer> rightSideEvaluator =
-        factory.forApplication(flattenedProducer, rightBundle);
+        factory.forApplication(flatten, rightBundle);
 
     leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1));
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
@@ -88,13 +100,9 @@ public void testFlattenInMemoryEvaluator() throws Exception {
     TransformResult<Integer> leftSideResult = leftSideEvaluator.finishBundle();
 
     assertThat(rightSideResult.getOutputBundles(), Matchers.contains(flattenedRightBundle));
-    assertThat(
-        rightSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer));
+    assertThat(rightSideResult.getTransform(), Matchers.equalTo(flatten));
     assertThat(leftSideResult.getOutputBundles(), Matchers.contains(flattenedLeftBundle));
-    assertThat(
-        leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer));
+    assertThat(leftSideResult.getTransform(), Matchers.equalTo(flatten));
 
     assertThat(
         flattenedLeftBundle.commit(Instant.now()).getElements(),
@@ -109,32 +117,4 @@ public void testFlattenInMemoryEvaluator() throws Exception {
             WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)),
             WindowedValue.valueInGlobalWindow(-1)));
   }
-
-  @Test
-  public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
-    PCollectionList<Integer> list = PCollectionList.empty(p);
-
-    PCollection<Integer> flattened = list.apply(Flatten.pCollections());
-    flattened.setCoder(VarIntCoder.of());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    when(evaluationContext.createBundle(flattened))
-        .thenReturn(bundleFactory.createBundle(flattened));
-
-    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
-    AppliedPTransform<?, ?, ?> flattendProducer = DirectGraphs.getProducer(flattened);
-    TransformEvaluator<Integer> emptyEvaluator =
-        factory.forApplication(
-            flattendProducer,
-            bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-
-    TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();
-
-    CommittedBundle<?> outputBundle =
-        Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now());
-    assertThat(outputBundle.getElements(), emptyIterable());
-    assertThat(
-        leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattendProducer));
-  }
 }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyEvaluatorFactoryTest.java
deleted file mode 100644
index b18d1603a52..00000000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multiset;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.runners.direct.portable.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link GroupByKeyEvaluatorFactory}. */
-@RunWith(JUnit4.class)
-public class GroupByKeyEvaluatorFactoryTest {
-  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
-  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  @Test
-  public void testInMemoryEvaluator() throws Exception {
-    KV<String, Integer> firstFoo = KV.of("foo", -1);
-    KV<String, Integer> secondFoo = KV.of("foo", 1);
-    KV<String, Integer> thirdFoo = KV.of("foo", 3);
-    KV<String, Integer> firstBar = KV.of("bar", 22);
-    KV<String, Integer> secondBar = KV.of("bar", 12);
-    KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
-    PCollection<KV<String, Integer>> values =
-        p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
-    PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
-        values.apply(new DirectGroupByKeyOnly<>());
-
-    CommittedBundle<KV<String, Integer>> inputBundle =
-        bundleFactory.createBundle(values).commit(Instant.now());
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        bundleFactory.createKeyedBundle(fooKey, groupedKvs);
-
-    StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        bundleFactory.createKeyedBundle(barKey, groupedKvs);
-
-    StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        bundleFactory.createKeyedBundle(bazKey, groupedKvs);
-
-    when(evaluationContext.createKeyedBundle(fooKey, groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(barKey, groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(bazKey, groupedKvs)).thenReturn(bazBundle);
-
-    // The input to a GroupByKey is assumed to be a KvCoder
-    @SuppressWarnings("unchecked")
-    Coder<String> keyCoder = ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
-    TransformEvaluator<KV<String, Integer>> evaluator =
-        new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
-            .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
-
-    evaluator.finishBundle();
-
-    assertThat(
-        fooBundle.commit(Instant.now()).getElements(),
-        contains(
-            new KeyedWorkItemMatcher<>(
-                KeyedWorkItems.elementsWorkItem(
-                    "foo",
-                    ImmutableSet.of(
-                        WindowedValue.valueInGlobalWindow(-1),
-                        WindowedValue.valueInGlobalWindow(1),
-                        WindowedValue.valueInGlobalWindow(3))),
-                keyCoder)));
-    assertThat(
-        barBundle.commit(Instant.now()).getElements(),
-        contains(
-            new KeyedWorkItemMatcher<>(
-                KeyedWorkItems.elementsWorkItem(
-                    "bar",
-                    ImmutableSet.of(
-                        WindowedValue.valueInGlobalWindow(12),
-                        WindowedValue.valueInGlobalWindow(22))),
-                keyCoder)));
-    assertThat(
-        bazBundle.commit(Instant.now()).getElements(),
-        contains(
-            new KeyedWorkItemMatcher<>(
-                KeyedWorkItems.elementsWorkItem(
-                    "baz", ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
-                keyCoder)));
-  }
-
-  private static class KeyedWorkItemMatcher<K, V>
-      extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
-    private final KeyedWorkItem<K, V> myWorkItem;
-    private final Coder<K> keyCoder;
-
-    KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) {
-      this.myWorkItem = myWorkItem;
-      this.keyCoder = keyCoder;
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      if (item == null || !(item instanceof WindowedValue)) {
-        return false;
-      }
-      WindowedValue<KeyedWorkItem<K, V>> that = (WindowedValue<KeyedWorkItem<K, V>>) item;
-      Multiset<WindowedValue<V>> myValues = HashMultiset.create();
-      Multiset<WindowedValue<V>> thatValues = HashMultiset.create();
-      for (WindowedValue<V> value : myWorkItem.elementsIterable()) {
-        myValues.add(value);
-      }
-      for (WindowedValue<V> value : that.getValue().elementsIterable()) {
-        thatValues.add(value);
-      }
-      try {
-        return myValues.equals(thatValues)
-            && keyCoder
-                .structuralValue(myWorkItem.key())
-                .equals(keyCoder.structuralValue(that.getValue().key()));
-      } catch (Exception e) {
-        return false;
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-          .appendText("KeyedWorkItem<K, V> containing key ")
-          .appendValue(myWorkItem.key())
-          .appendText(" and values ")
-          .appendValueList("[", ", ", "]", myWorkItem.elementsIterable());
-    }
-  }
-}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
index ec2d3bbbfe9..64541254f16 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -25,22 +25,26 @@
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multiset;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.runners.direct.portable.DirectGroupByKey.DirectGroupByKeyOnly;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.local.StructuralKey;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -54,6 +58,7 @@
   @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Test
+  @Ignore("TODO: BEAM-4240 Not yet migrated")
   public void testInMemoryEvaluator() throws Exception {
     KV<String, Integer> firstFoo = KV.of("foo", -1);
     KV<String, Integer> secondFoo = KV.of("foo", 1);
@@ -61,13 +66,31 @@ public void testInMemoryEvaluator() throws Exception {
     KV<String, Integer> firstBar = KV.of("bar", 22);
     KV<String, Integer> secondBar = KV.of("bar", 12);
     KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
-    PCollection<KV<String, Integer>> values =
-        p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
-    PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
-        values.apply(new DirectGroupByKeyOnly<>());
+
+    KvCoder<String, Integer> kvCoder =
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of());
+
+    PCollectionNode values =
+        PipelineNode.pCollection(
+            "values",
+            RunnerApi.PCollection.newBuilder()
+                .setUniqueName("values")
+                .setCoderId("kvCoder")
+                .build());
+    PCollectionNode groupedKvs =
+        PipelineNode.pCollection(
+            "groupedKvs", RunnerApi.PCollection.newBuilder().setUniqueName("groupedKvs").build());
+    PTransformNode groupByKeyOnly =
+        PipelineNode.pTransform(
+            "gbko",
+            PTransform.newBuilder()
+                .putInputs("input", "values")
+                .putOutputs("output", "groupedKvs")
+                .setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GBKO_URN).build())
+                .build());
 
     CommittedBundle<KV<String, Integer>> inputBundle =
-        bundleFactory.createBundle(values).commit(Instant.now());
+        bundleFactory.<KV<String, Integer>>createBundle(values).commit(Instant.now());
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
 
     StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
@@ -80,16 +103,22 @@ public void testInMemoryEvaluator() throws Exception {
     UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
         bundleFactory.createKeyedBundle(bazKey, groupedKvs);
 
-    when(evaluationContext.createKeyedBundle(fooKey, groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(barKey, groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(bazKey, groupedKvs)).thenReturn(bazBundle);
+    when(evaluationContext.<String, KeyedWorkItem<String, Integer>>createKeyedBundle(
+            fooKey, groupedKvs))
+        .thenReturn(fooBundle);
+    when(evaluationContext.<String, KeyedWorkItem<String, Integer>>createKeyedBundle(
+            barKey, groupedKvs))
+        .thenReturn(barBundle);
+    when(evaluationContext.<String, KeyedWorkItem<String, Integer>>createKeyedBundle(
+            bazKey, groupedKvs))
+        .thenReturn(bazBundle);
 
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")
-    Coder<String> keyCoder = ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
+    Coder<String> keyCoder = kvCoder.getKeyCoder();
     TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
-            .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle);
+            .forApplication(groupByKeyOnly, inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
     evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactoryTest.java
index 9b51d3f348e..91d0c7be07e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImmutableListBundleFactoryTest.java
@@ -26,6 +26,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,14 +36,11 @@
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -60,20 +60,23 @@
 
   private ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
-  private PCollection<Integer> created;
-  private PCollection<KV<String, Integer>> downstream;
+  private PCollectionNode created;
+  private PCollectionNode downstream;
 
   @Before
   public void setup() {
-    created = p.apply(Create.of(1, 2, 3));
-    downstream = created.apply(WithKeys.of("foo"));
+    created =
+        PipelineNode.pCollection(
+            "created", RunnerApi.PCollection.newBuilder().setUniqueName("created").build());
+    created =
+        PipelineNode.pCollection(
+            "downstream", RunnerApi.PCollection.newBuilder().setUniqueName("downstream").build());
   }
 
   private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception {
-    PCollection<Integer> pcollection = p.apply("Create", Create.of(1));
     StructuralKey<?> skey = StructuralKey.of(key, coder);
 
-    UncommittedBundle<Integer> inFlightBundle = bundleFactory.createKeyedBundle(skey, pcollection);
+    UncommittedBundle<Integer> inFlightBundle = bundleFactory.createKeyedBundle(skey, created);
 
     CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
     assertThat(bundle.getKey(), Matchers.equalTo(skey));
@@ -223,7 +226,8 @@ public void commitAfterCommitShouldThrowException() {
   public void createKeyedBundleKeyed() {
     CommittedBundle<KV<String, Integer>> keyedBundle =
         bundleFactory
-            .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), downstream)
+            .<String, KV<String, Integer>>createKeyedBundle(
+                StructuralKey.of("foo", StringUtf8Coder.of()), downstream)
             .commit(Instant.now());
     assertThat(keyedBundle.getKey().getKey(), Matchers.equalTo("foo"));
   }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
index fcf7a2b5f0f..7d0fc53f9bd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
@@ -27,18 +27,21 @@
 
 import com.google.common.collect.Iterables;
 import java.util.Collection;
-import org.apache.beam.runners.direct.DirectGraphs;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.portable.ImpulseEvaluatorFactory.ImpulseRootProvider;
 import org.apache.beam.runners.direct.portable.ImpulseEvaluatorFactory.ImpulseShard;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -47,9 +50,20 @@
 
 /** Tests for {@link ImpulseEvaluatorFactory}. */
 @RunWith(JUnit4.class)
+@Ignore("Not yet migrated")
 public class ImpulseEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
+  private PTransformNode impulseApplication =
+      PipelineNode.pTransform(
+          "impulse",
+          PTransform.newBuilder()
+              .setSpec(
+                  FunctionSpec.newBuilder()
+                      .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)
+                      .build())
+              .build());
+
   @Mock private EvaluationContext context;
 
   @Before
@@ -59,10 +73,9 @@ public void setup() {
 
   @Test
   public void testImpulse() throws Exception {
-    Pipeline p = Pipeline.create();
-    PCollection<byte[]> impulseOut = p.apply(Impulse.create());
-
-    AppliedPTransform<?, ?, ?> impulseApplication = DirectGraphs.getProducer(impulseOut);
+    PCollectionNode impulseOut =
+        PipelineNode.pCollection(
+            "impulse.out", RunnerApi.PCollection.newBuilder().setUniqueName("impulse.out").build());
 
     ImpulseEvaluatorFactory factory = new ImpulseEvaluatorFactory(context);
 
@@ -99,18 +112,11 @@ public void testImpulse() throws Exception {
 
   @Test
   public void testRootProvider() {
-    Pipeline p = Pipeline.create();
-    PCollection<byte[]> impulseOut = p.apply(Impulse.create());
-    // Add a second impulse to demonstrate no crosstalk between applications
-    @SuppressWarnings("unused")
-    PCollection<byte[]> impulseOutTwo = p.apply(Impulse.create());
-    AppliedPTransform<?, ?, ?> impulseApplication = DirectGraphs.getProducer(impulseOut);
-
     ImpulseRootProvider rootProvider = new ImpulseRootProvider(context);
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
 
-    Collection<CommittedBundle<?>> inputs =
-        rootProvider.getInitialInputs((AppliedPTransform) impulseApplication, 100);
+    Collection<? extends CommittedBundle<?>> inputs =
+        rootProvider.getInitialInputs(impulseApplication, 100);
 
     assertThat("Only one impulse bundle per application", inputs, hasSize(1));
     assertThat(
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/MockClock.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/MockClock.java
index acad3e7ceb3..1cc9e33b4e6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/MockClock.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/MockClock.java
@@ -19,6 +19,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.runners.direct.Clock;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/PortableGraphTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/PortableGraphTest.java
similarity index 99%
rename from runners/direct-java/src/test/java/org/apache/beam/runners/direct/PortableGraphTest.java
rename to runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/PortableGraphTest.java
index 010aaac3b7c..df6e57d4447 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/PortableGraphTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/PortableGraphTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.direct;
+package org.apache.beam.runners.direct.portable;
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.hamcrest.Matchers.containsInAnyOrder;
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/StepTransformResultTest.java
index 8a640b62193..76890bb3d27 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/StepTransformResultTest.java
@@ -23,13 +23,13 @@
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.runners.direct.ExecutableGraph;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.portable.CommittedResult.OutputType;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
@@ -40,18 +40,19 @@
 /** Tests for {@link StepTransformResult}. */
 @RunWith(JUnit4.class)
 public class StepTransformResultTest {
-  private AppliedPTransform<?, ?, ?> transform;
+  private PTransformNode transform;
   private BundleFactory bundleFactory;
-  private PCollection<Integer> pc;
+  private PCollectionNode pc;
 
   @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Before
   public void setup() {
-    pc = p.apply(Create.of(1, 2, 3));
-    ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph =
-        DirectGraphs.getGraph(p);
-    transform = graph.getProducer(pc);
+    pc =
+        PipelineNode.pCollection(
+            "pc", RunnerApi.PCollection.newBuilder().setUniqueName("pc").build());
+    transform =
+        PipelineNode.pTransform("pt", PTransform.newBuilder().putOutputs("out", "pc").build());
 
     bundleFactory = ImmutableListBundleFactory.create();
   }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutorTest.java
index a55ea93d4dc..ac698755c8f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkCallbackExecutorTest.java
@@ -23,23 +23,18 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.runners.direct.ExecutableGraph;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -49,20 +44,27 @@
 public class WatermarkCallbackExecutorTest {
   private WatermarkCallbackExecutor executor =
       WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());
-  private AppliedPTransform<?, ?, ?> create;
-  private AppliedPTransform<?, ?, ?> sum;
-
-  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  private PTransformNode create;
+  private PTransformNode sum;
 
   @Before
   public void setup() {
-    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    PCollection<Integer> summed = created.apply(Sum.integersGlobally());
-    DirectGraphs.performDirectOverrides(p);
-    ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph =
-        DirectGraphs.getGraph(p);
-    create = graph.getProducer(created);
-    sum = graph.getProducer(summed);
+    create =
+        PipelineNode.pTransform(
+            "create",
+            PTransform.newBuilder()
+                .setUniqueName("create")
+                .putInputs("in", "impulse.out")
+                .putOutputs("out", "create.out")
+                .build());
+    sum =
+        PipelineNode.pTransform(
+            "sum",
+            PTransform.newBuilder()
+                .setUniqueName("sum")
+                .putInputs("in", "create.in")
+                .putOutputs("out", "sum.out")
+                .build());
   }
 
   @Test
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkManagerTest.java
deleted file mode 100644
index 08746f70e46..00000000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WatermarkManagerTest.java
+++ /dev/null
@@ -1,1688 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct.portable;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.DirectGraphs;
-import org.apache.beam.runners.direct.ExecutableGraph;
-import org.apache.beam.runners.direct.portable.WatermarkManager.AppliedPTransformInputWatermark;
-import org.apache.beam.runners.direct.portable.WatermarkManager.FiredTimers;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.runners.direct.portable.WatermarkManager.TransformWatermarks;
-import org.apache.beam.runners.direct.portable.WatermarkManager.Watermark;
-import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/**
- * Tests for {@link WatermarkManager}.
- */
-@RunWith(JUnit4.class)
-public class WatermarkManagerTest implements Serializable {
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  private transient MockClock clock;
-
-  private transient PCollection<Integer> createdInts;
-
-  private transient PCollection<Integer> filtered;
-  private transient PCollection<Integer> filteredTimesTwo;
-  private transient PCollection<KV<String, Integer>> keyed;
-
-  private transient PCollection<Integer> intsToFlatten;
-  private transient PCollection<Integer> flattened;
-
-  private transient WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> manager;
-  private transient BundleFactory bundleFactory;
-  private ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph;
-
-  @Rule
-  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  @Before
-  public void setup() {
-
-    createdInts = p.apply("createdInts", Create.of(1, 2, 3));
-
-    filtered = createdInts.apply("filtered", Filter.greaterThan(1));
-    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() {
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element() * 2);
-      }
-    }));
-
-    keyed = createdInts.apply("keyed", WithKeys.of("MyKey"));
-
-    intsToFlatten = p.apply("intsToFlatten", Create.of(-1, 256, 65535));
-    PCollectionList<Integer> preFlatten = PCollectionList.of(createdInts).and(intsToFlatten);
-    flattened = preFlatten.apply("flattened", Flatten.pCollections());
-
-    clock = MockClock.fromInstant(new Instant(1000));
-    DirectGraphs.performDirectOverrides(p);
-    graph = DirectGraphs.getGraph(p);
-
-    manager = WatermarkManager.create(clock, graph);
-    bundleFactory = ImmutableListBundleFactory.create();
-  }
-
-  /**
-   * Demonstrates that getWatermark, when called on an {@link AppliedPTransform} that has not
-   * processed any elements, returns the {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
-   */
-  @Test
-  public void getWatermarkForUntouchedTransform() {
-    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
-
-    assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-    assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-  }
-
-  /**
-   * Demonstrates that getWatermark for a transform that consumes no input uses the Watermark
-   * Hold value provided to it as the output watermark.
-   */
-  @Test
-  public void getWatermarkForUpdatedSourceTransform() {
-    CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.singleton(output),
-        new Instant(8000L));
-    manager.refreshAll();
-    TransformWatermarks updatedSourceWatermark =
-        manager.getWatermarks(graph.getProducer(createdInts));
-
-    assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L)));
-  }
-
-  /**
-   * Demonstrates that getWatermark for a transform that takes multiple inputs is held to the
-   * minimum watermark across all of its inputs.
-   */
-  @Test
-  public void getWatermarkForMultiInputTransform() {
-    CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1);
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(intsToFlatten),
-        null,
-        Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    // We didn't do anything for the first source, so we shouldn't have progressed the watermark
-    TransformWatermarks firstSourceWatermark =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(
-        firstSourceWatermark.getOutputWatermark(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    // the Second Source output all of the elements so it should be done (with a watermark at the
-    // end of time).
-    TransformWatermarks secondSourceWatermark =
-        manager.getWatermarks(graph.getProducer(intsToFlatten));
-    assertThat(
-        secondSourceWatermark.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    // We haven't consumed anything yet, so our watermark should be at the beginning of time
-    TransformWatermarks transformWatermark = manager.getWatermarks(graph.getProducer(flattened));
-    assertThat(
-        transformWatermark.getInputWatermark(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        transformWatermark.getOutputWatermark(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    CommittedBundle<Integer> flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1);
-    // We have finished processing the bundle from the second PCollection, but we haven't consumed
-    // anything from the first PCollection yet; so our watermark shouldn't advance
-    manager.updateWatermarks(
-        secondPcollectionBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(flattened),
-        secondPcollectionBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks transformAfterProcessing =
-        manager.getWatermarks(graph.getProducer(flattened));
-    manager.updateWatermarks(
-        secondPcollectionBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(flattened),
-        secondPcollectionBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    assertThat(
-        transformAfterProcessing.getInputWatermark(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        transformAfterProcessing.getOutputWatermark(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    Instant firstCollectionTimestamp = new Instant(10000);
-    CommittedBundle<Integer> firstPcollectionBundle =
-        timestampedBundle(createdInts, TimestampedValue.of(5, firstCollectionTimestamp));
-    // the source is done, but elements are still buffered. The source output watermark should be
-    // past the end of the global window
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks firstSourceWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(
-        firstSourceWatermarks.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    // We still haven't consumed any of the first source's input, so the watermark should still not
-    // progress
-    TransformWatermarks flattenAfterSourcesProduced =
-        manager.getWatermarks(graph.getProducer(flattened));
-    assertThat(
-        flattenAfterSourcesProduced.getInputWatermark(),
-        not(greaterThan(firstCollectionTimestamp)));
-    assertThat(
-        flattenAfterSourcesProduced.getOutputWatermark(),
-        not(greaterThan(firstCollectionTimestamp)));
-
-    // We have buffered inputs, but since the PCollection has all of the elements (has a WM past the
-    // end of the global window), we should have a watermark equal to the min among buffered
-    // elements
-    TransformWatermarks withBufferedElements = manager.getWatermarks(graph.getProducer(flattened));
-    assertThat(withBufferedElements.getInputWatermark(), equalTo(firstCollectionTimestamp));
-    assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp));
-
-    CommittedBundle<?> completedFlattenBundle =
-        bundleFactory.createBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(
-        firstPcollectionBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(flattened),
-        firstPcollectionBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks afterConsumingAllInput =
-        manager.getWatermarks(graph.getProducer(flattened));
-    assertThat(
-        afterConsumingAllInput.getInputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        afterConsumingAllInput.getOutputWatermark(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that getWatermark for a transform that takes multiple inputs is held to the
-   * minimum watermark across all of its inputs.
-   */
-  @Test
-  public void getWatermarkMultiIdenticalInput() {
-    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    PCollection<Integer> multiConsumer =
-        PCollectionList.of(created).and(created).apply(Flatten.pCollections());
-    ExecutableGraph<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> graph =
-        DirectGraphs.getGraph(p);
-
-    AppliedPTransform<?, ?, ?> theFlatten = graph.getProducer(multiConsumer);
-
-    WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> tstMgr =
-        WatermarkManager.create(clock, graph);
-    CommittedBundle<Void> root =
-        bundleFactory
-            .<Void>createRootBundle()
-            .add(WindowedValue.valueInGlobalWindow(null))
-            .commit(clock.now());
-    CommittedBundle<Integer> createBundle =
-        bundleFactory
-            .createBundle(created)
-            .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(33536)))
-            .commit(clock.now());
-
-    Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
-            .put(graph.getProducer(created), Collections.singleton(root))
-            .build();
-    tstMgr.initialize((Map) initialInputs);
-    tstMgr.updateWatermarks(
-        root,
-        TimerUpdate.empty(),
-        graph.getProducer(created),
-        null,
-        Collections.singleton(createBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    tstMgr.refreshAll();
-    TransformWatermarks flattenWms = tstMgr.getWatermarks(theFlatten);
-    assertThat(flattenWms.getInputWatermark(), equalTo(new Instant(33536)));
-
-    tstMgr.updateWatermarks(
-        createBundle,
-        TimerUpdate.empty(),
-        theFlatten,
-        null,
-        Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    tstMgr.refreshAll();
-    assertThat(flattenWms.getInputWatermark(), equalTo(new Instant(33536)));
-
-    tstMgr.updateWatermarks(
-        createBundle,
-        TimerUpdate.empty(),
-        theFlatten,
-        null,
-        Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    tstMgr.refreshAll();
-    assertThat(flattenWms.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
-  /**
-   * Demonstrates that pending elements are independent among
-   * {@link AppliedPTransform AppliedPTransforms} that consume the same input {@link PCollection}.
-   */
-  @Test
-  public void getWatermarkForMultiConsumedCollection() {
-    CommittedBundle<Integer> createdBundle =
-        timestampedBundle(
-            createdInts,
-            TimestampedValue.of(1, new Instant(1_000_000L)),
-            TimestampedValue.of(2, new Instant(1234L)),
-            TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
-        new Instant(Long.MAX_VALUE));
-    manager.refreshAll();
-    TransformWatermarks createdAfterProducing =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(
-        createdAfterProducing.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(keyBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(
-        keyedWatermarks.getInputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        keyedWatermarks.getOutputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(filteredWatermarks.getInputWatermark(), not(greaterThan(new Instant(-1000L))));
-    assertThat(filteredWatermarks.getOutputWatermark(), not(greaterThan(new Instant(-1000L))));
-
-    CommittedBundle<Integer> filteredBundle =
-        timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks filteredProcessedWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(
-        filteredProcessedWatermarks.getInputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        filteredProcessedWatermarks.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that the watermark of an {@link AppliedPTransform} is held to the provided
-   * watermark hold.
-   */
-  @Test
-  public void updateWatermarkWithWatermarkHolds() {
-    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)),
-        TimestampedValue.of(2, new Instant(1234L)),
-        TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
-        new Instant(Long.MAX_VALUE));
-
-    CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
-        TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-        TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-        TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(keyed),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle),
-        new Instant(500L));
-    manager.refreshAll();
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(
-        keyedWatermarks.getInputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(keyedWatermarks.getOutputWatermark(), not(greaterThan(new Instant(500L))));
-  }
-
-  /**
-   * Demonstrates that the watermark of an {@link AppliedPTransform} is held to the provided
-   * watermark hold.
-   */
-  @Test
-  public void updateWatermarkWithKeyedWatermarkHolds() {
-    CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle(
-        StructuralKey.of("Odd", StringUtf8Coder.of()),
-        createdInts)
-        .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
-        .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
-        .commit(clock.now());
-
-    CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle(
-        StructuralKey.of("Even", StringUtf8Coder.of()),
-        createdInts)
-        .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
-        .commit(clock.now());
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        ImmutableList.of(firstKeyBundle, secondKeyBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    manager.updateWatermarks(
-        firstKeyBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(filtered),
-            firstKeyBundle.withElements(Collections.emptyList()),
-            Collections.emptyList(),
-        new Instant(-1000L));
-    manager.updateWatermarks(
-        secondKeyBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(filtered),
-            secondKeyBundle.withElements(Collections.emptyList()),
-            Collections.emptyList(),
-        new Instant(1234L));
-    manager.refreshAll();
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(filteredWatermarks.getInputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(filteredWatermarks.getOutputWatermark(), not(greaterThan(new Instant(-1000L))));
-
-    CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(
-        StructuralKey.of("Odd", StringUtf8Coder.of()),
-        createdInts).commit(clock.now());
-    manager.updateWatermarks(
-        fauxFirstKeyTimerBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(filtered),
-            fauxFirstKeyTimerBundle.withElements(Collections.emptyList()),
-            Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
-
-    CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(
-        StructuralKey.of("Even", StringUtf8Coder.of()),
-        createdInts).commit(clock.now());
-    manager.updateWatermarks(
-        fauxSecondKeyTimerBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(filtered),
-            fauxSecondKeyTimerBundle.withElements(Collections.emptyList()),
-            Collections.emptyList(),
-        new Instant(5678L));
-    manager.refreshAll();
-    assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
-
-    manager.updateWatermarks(
-        fauxSecondKeyTimerBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(filtered),
-            fauxSecondKeyTimerBundle.withElements(Collections.emptyList()),
-            Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    assertThat(filteredWatermarks.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that updated output watermarks are monotonic in the presence of late data, when
-   * called on an {@link AppliedPTransform} that consumes no input.
-   */
-  @Test
-  public void updateOutputWatermarkShouldBeMonotonic() {
-    CommittedBundle<?> firstInput =
-        bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(firstInput),
-        new Instant(0L));
-    manager.refreshAll();
-    TransformWatermarks firstWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
-
-    CommittedBundle<?> secondInput =
-        bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(secondInput),
-        new Instant(-250L));
-    manager.refreshAll();
-    TransformWatermarks secondWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(secondWatermarks.getOutputWatermark(), not(lessThan(new Instant(0L))));
-  }
-
-  /**
-   * Demonstrates that updated output watermarks are monotonic in the presence of watermark holds
-   * that become earlier than a previous watermark hold.
-   */
-  @Test
-  public void updateWatermarkWithHoldsShouldBeMonotonic() {
-    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)),
-        TimestampedValue.of(2, new Instant(1234L)),
-        TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle),
-        new Instant(Long.MAX_VALUE));
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(keyed),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle),
-        new Instant(500L));
-    manager.refreshAll();
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(
-        keyedWatermarks.getInputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(keyedWatermarks.getOutputWatermark(), not(greaterThan(new Instant(500L))));
-    Instant oldOutputWatermark = keyedWatermarks.getOutputWatermark();
-
-    TransformWatermarks updatedWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(
-        updatedWatermarks.getInputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    // We added a hold prior to the old watermark; we shouldn't progress (due to the earlier hold)
-    // but the watermark is monotonic and should not backslide to the new, earlier hold
-    assertThat(updatedWatermarks.getOutputWatermark(), equalTo(oldOutputWatermark));
-  }
-
-  @Test
-  public void updateWatermarkWithUnprocessedElements() {
-    WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1);
-    WindowedValue<Integer> second =
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
-    WindowedValue<Integer> third =
-        WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
-    CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts)
-        .add(first)
-        .add(second)
-        .add(third)
-        .commit(clock.now());
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
-        TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(keyed),
-        createdBundle.withElements(ImmutableList.of(second, third)),
-        Collections.<CommittedBundle<?>>singleton(keyBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    // the unprocessed second and third are readded to pending
-    assertThat(
-        keyedWatermarks.getInputWatermark(), not(greaterThan(new Instant(-1000L))));
-  }
-
-  @Test
-  public void updateWatermarkWithCompletedElementsNotPending() {
-    WindowedValue<Integer> first = WindowedValue.timestampedValueInGlobalWindow(1, new Instant(22));
-    CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts)
-        .add(first)
-        .commit(clock.now());
-
-    WindowedValue<Integer> second =
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(22));
-    CommittedBundle<Integer> neverCreatedBundle = bundleFactory.createBundle(createdInts)
-        .add(second)
-        .commit(clock.now());
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    manager.updateWatermarks(
-        neverCreatedBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(filtered),
-            neverCreatedBundle.withElements(Collections.emptyList()),
-            Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    manager.refreshAll();
-    TransformWatermarks filteredWms =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(filteredWms.getInputWatermark(), equalTo(new Instant(22L)));
-  }
-
-  /**
-   * Demonstrates that updateWatermarks in the presence of late data is monotonic.
-   */
-  @Test
-  public void updateWatermarkWithLateData() {
-    Instant sourceWatermark = new Instant(1_000_000L);
-    CommittedBundle<Integer> createdBundle =
-        timestampedBundle(
-            createdInts,
-            TimestampedValue.of(1, sourceWatermark),
-            TimestampedValue.of(2, new Instant(1234L)));
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
-        sourceWatermark);
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
-
-    // Finish processing the on-time data. The watermarks should progress to be equal to the source
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-            graph.getProducer(keyed),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks onTimeWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
-    assertThat(onTimeWatermarks.getOutputWatermark(), equalTo(sourceWatermark));
-
-    CommittedBundle<Integer> lateDataBundle =
-        timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L)));
-    // the late data arrives in a downstream PCollection after its watermark has advanced past it;
-    // we don't advance the watermark past the current watermark until we've consumed the late data
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-            graph.getProducer(createdInts),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(lateDataBundle),
-        new Instant(2_000_000L));
-    manager.refreshAll();
-    TransformWatermarks bufferedLateWm =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
-
-    // The input watermark should be held to its previous value (not advanced due to late data; not
-    // moved backwards in the presence of watermarks due to monotonicity).
-    TransformWatermarks lateDataBufferedWatermark =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(lateDataBufferedWatermark.getInputWatermark(), not(lessThan(sourceWatermark)));
-    assertThat(lateDataBufferedWatermark.getOutputWatermark(), not(lessThan(sourceWatermark)));
-
-    CommittedBundle<KV<String, Integer>> lateKeyedBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(
-        lateDataBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(keyed),
-        lateDataBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(lateKeyedBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-  }
-
-  @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-4191")
-  public void updateWatermarkWithDifferentWindowedValueInstances() {
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(
-            bundleFactory
-                .createBundle(createdInts)
-                .add(WindowedValue.valueInGlobalWindow(1))
-                .commit(Instant.now())),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts)
-        .add(WindowedValue.valueInGlobalWindow(1))
-        .commit(Instant.now());
-    manager.updateWatermarks(
-        createdBundle,
-        TimerUpdate.empty(),
-        graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.emptyList(),
-        null);
-    manager.refreshAll();
-    TransformWatermarks onTimeWatermarks =
-        manager.getWatermarks(graph.getProducer(keyed));
-    assertThat(onTimeWatermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
-  /**
-   * Demonstrates that after watermarks of an upstream transform are updated, but no output has been
-   * produced, the watermarks of a downstream process are advanced.
-   */
-  @Test
-  public void getWatermarksAfterOnlyEmptyOutput() {
-    CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-
-    assertThat(
-        updatedSourceWatermarks.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    TransformWatermarks finishedFilterWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(
-        finishedFilterWatermarks.getInputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        finishedFilterWatermarks.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that after watermarks of an upstream transform are updated, but no output has been
-   * produced, and the downstream transform has a watermark hold, the watermark is held to the hold.
-   */
-  @Test
-  public void getWatermarksAfterHoldAndEmptyOutput() {
-    CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(firstCreateOutput),
-        new Instant(12_000L));
-
-    CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
-    manager.updateWatermarks(
-        firstCreateOutput,
-        TimerUpdate.empty(),
-        graph.getProducer(filtered),
-        firstCreateOutput.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
-        new Instant(10_000L));
-    manager.refreshAll();
-    TransformWatermarks firstFilterWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(firstFilterWatermarks.getInputWatermark(), not(lessThan(new Instant(12_000L))));
-    assertThat(firstFilterWatermarks.getOutputWatermark(), not(greaterThan(new Instant(10_000L))));
-
-    CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-
-    assertThat(
-        updatedSourceWatermarks.getOutputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    TransformWatermarks finishedFilterWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(
-        finishedFilterWatermarks.getInputWatermark(),
-        not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        finishedFilterWatermarks.getOutputWatermark(), not(greaterThan(new Instant(10_000L))));
-  }
-
-  @Test
-  public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
-    TransformWatermarks watermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
-    assertThat(
-        watermarks.getSynchronizedProcessingOutputTime(),
-        equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    // Non-root processing watermarks don't progress until data has been processed
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingInputTime(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingOutputTime(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    CommittedBundle<Integer> createOutput =
-        bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks createAfterUpdate =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
-
-    TransformWatermarks filterAfterProduced =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now())));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingOutputTime(), not(greaterThan(clock.now())));
-
-    clock.set(new Instant(1500L));
-    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingInputTime(),
-        not(greaterThan(new Instant(1250L))));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingOutputTime(),
-        not(greaterThan(new Instant(1250L))));
-
-    CommittedBundle<?> filterOutputBundle =
-        bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L));
-    manager.updateWatermarks(
-        createOutput,
-        TimerUpdate.empty(),
-        graph.getProducer(filtered),
-        createOutput.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks filterAfterConsumed =
-        manager.getWatermarks(graph.getProducer(filtered));
-    assertThat(
-        filterAfterConsumed.getSynchronizedProcessingInputTime(),
-        not(greaterThan(createAfterUpdate.getSynchronizedProcessingOutputTime())));
-    assertThat(
-        filterAfterConsumed.getSynchronizedProcessingOutputTime(),
-        not(greaterThan(filterAfterConsumed.getSynchronizedProcessingInputTime())));
-  }
-
-  /**
-   * Demonstrates that the Synchronized Processing Time output watermark cannot progress past
-   * pending timers in the same set. This propagates to all downstream SynchronizedProcessingTimes.
-   *
-   * <p>Also demonstrate that the result is monotonic.
-   */
-  @Test
-  public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
-        new Instant(1248L));
-    manager.refreshAll();
-
-    TransformWatermarks filteredWms =
-        manager.getWatermarks(graph.getProducer(filtered));
-    TransformWatermarks filteredDoubledWms =
-        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
-    Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
-    Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
-
-    StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
-    CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
-    TimerData pastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME);
-    TimerData futureTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
-    TimerUpdate timers =
-        TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
-    manager.updateWatermarks(
-        createdBundle,
-        timers,
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    Instant startTime = clock.now();
-    clock.set(startTime.plus(250L));
-    // We're held based on the past timer
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(greaterThan(startTime)));
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(greaterThan(startTime)));
-    // And we're monotonic
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(lessThan(initialFilteredWm)));
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
-        not(lessThan(initialFilteredDoubledWm)));
-
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> firedTimers = manager.extractFiredTimers();
-    assertThat(
-        Iterables.getOnlyElement(firedTimers).getTimers(),
-        contains(pastTimer));
-    // Our timer has fired, but has not been completed, so it holds our synchronized processing WM
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(greaterThan(startTime)));
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(greaterThan(startTime)));
-
-    CommittedBundle<Integer> filteredTimerBundle =
-        bundleFactory
-            .createKeyedBundle(key, filtered)
-            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    CommittedBundle<Integer> filteredTimerResult =
-        bundleFactory.createKeyedBundle(key, filteredTimesTwo)
-            .commit(filteredWms.getSynchronizedProcessingOutputTime());
-    // Complete the processing time timer
-    manager.updateWatermarks(
-        filteredTimerBundle,
-        TimerUpdate.builder(key).withCompletedTimers(Collections.singleton(pastTimer)).build(),
-        graph.getProducer(filtered),
-        filteredTimerBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    clock.set(startTime.plus(500L));
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(greaterThan(clock.now())));
-    // filtered should be held to the time at which the filteredTimerResult fired
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
-        not(lessThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark())));
-
-    manager.updateWatermarks(
-        filteredTimerResult,
-        TimerUpdate.empty(),
-        graph.getProducer(filteredTimesTwo),
-        filteredTimerResult.withElements(Collections.emptyList()),
-        Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
-
-    clock.set(new Instant(Long.MAX_VALUE));
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), equalTo(new Instant(4096)));
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
-        not(greaterThan(new Instant(4096))));
-  }
-
-  /**
-   * Demonstrates that if any earlier processing holds appear in the synchronized processing time
-   * output hold the result is monotonic.
-   */
-  @Test
-  public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
-    Instant startTime = clock.now();
-    TransformWatermarks watermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(startTime));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(graph.getProducer(filtered));
-    // Non-root processing watermarks don't progress until data has been processed
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingInputTime(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingOutputTime(),
-        not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    CommittedBundle<Integer> createOutput =
-        bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-    TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(createdInts));
-    assertThat(
-        createAfterUpdate.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now())));
-    assertThat(
-        createAfterUpdate.getSynchronizedProcessingOutputTime(), not(greaterThan(clock.now())));
-
-    CommittedBundle<Integer> createSecondOutput =
-        bundleFactory.createBundle(createdInts).commit(new Instant(750L));
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(createSecondOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
-  }
-
-  @Test
-  public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
-    CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(created),
-        new Instant(40_900L));
-    manager.refreshAll();
-
-    CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
-    Instant upstreamHold = new Instant(2048L);
-    TimerData upstreamProcessingTimer =
-        TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
-    manager.updateWatermarks(
-        created,
-        TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
-            .setTimer(upstreamProcessingTimer)
-            .build(),
-        graph.getProducer(filtered),
-        created.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
-
-    clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
-
-    manager.extractFiredTimers();
-    // Pending processing time timers that have been fired but aren't completed hold the
-    // synchronized processing time
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
-
-    CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
-    manager.updateWatermarks(
-        otherCreated,
-        TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
-            .withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
-            .build(),
-        graph.getProducer(filtered),
-        otherCreated.withElements(Collections.emptyList()),
-        Collections.emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(lessThan(clock.now())));
-  }
-
-  @Test
-  public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
-    CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.<CommittedBundle<?>>singleton(created),
-        new Instant(29_919_235L));
-
-    Instant upstreamHold = new Instant(2048L);
-    CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle(
-        StructuralKey.of("key", StringUtf8Coder.of()),
-        filtered).commit(upstreamHold);
-    manager.updateWatermarks(
-        created,
-        TimerUpdate.empty(),
-        graph.getProducer(filtered),
-        created.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.refreshAll();
-
-    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
-
-    clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
-  }
-
-  @Test
-  public void extractFiredTimersReturnsFiredEventTimeTimers() {
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> initialTimers =
-        manager.extractFiredTimers();
-    // Watermarks haven't advanced
-    assertThat(initialTimers, emptyIterable());
-
-    // Advance WM of keyed past the first timer, but ahead of the second and third
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.singleton(createdBundle),
-        new Instant(1500L));
-    manager.refreshAll();
-
-    TimerData earliestTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME);
-    TimerData middleTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME);
-    TimerData lastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME);
-    StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of());
-    TimerUpdate update =
-        TimerUpdate.builder(key)
-            .setTimer(earliestTimer)
-            .setTimer(middleTimer)
-            .setTimer(lastTimer)
-            .build();
-
-    manager.updateWatermarks(
-        createdBundle,
-        update,
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-    manager.refreshAll();
-
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> firstFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(firstFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> firstFired = Iterables.getOnlyElement(firstFiredTimers);
-    assertThat(firstFired.getTimers(), contains(earliestTimer));
-
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.emptyList(),
-        new Instant(50_000L));
-    manager.refreshAll();
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(secondFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> secondFired =
-        Iterables.getOnlyElement(secondFiredTimers);
-    // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
-  }
-
-  @Test
-  public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> initialTimers =
-        manager.extractFiredTimers();
-    // Watermarks haven't advanced
-    assertThat(initialTimers, emptyIterable());
-
-    // Advance WM of keyed past the first timer, but ahead of the second and third
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.singleton(createdBundle),
-        new Instant(1500L));
-
-    TimerData earliestTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME);
-    TimerData middleTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
-    TimerData lastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
-    StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
-    TimerUpdate update =
-        TimerUpdate.builder(key)
-            .setTimer(lastTimer)
-            .setTimer(earliestTimer)
-            .setTimer(middleTimer)
-            .build();
-
-    manager.updateWatermarks(
-        createdBundle,
-        update,
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-    manager.refreshAll();
-
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> firstFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(firstFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> firstFired = Iterables.getOnlyElement(firstFiredTimers);
-    assertThat(firstFired.getTimers(), contains(earliestTimer));
-
-    clock.set(new Instant(50_000L));
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.emptyList(),
-        new Instant(50_000L));
-    manager.refreshAll();
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(secondFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> secondFired =
-        Iterables.getOnlyElement(secondFiredTimers);
-    // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
-  }
-
-  @Test
-  public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> initialTimers =
-        manager.extractFiredTimers();
-    // Watermarks haven't advanced
-    assertThat(initialTimers, emptyIterable());
-
-    // Advance WM of keyed past the first timer, but ahead of the second and third
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.singleton(createdBundle),
-        new Instant(1500L));
-
-    TimerData earliestTimer = TimerData.of(
-        StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData middleTimer = TimerData.of(
-        StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData lastTimer = TimerData.of(
-        StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of());
-    TimerUpdate update =
-        TimerUpdate.builder(key)
-            .setTimer(lastTimer)
-            .setTimer(earliestTimer)
-            .setTimer(middleTimer)
-            .build();
-
-    manager.updateWatermarks(
-        createdBundle,
-        update,
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-    manager.refreshAll();
-
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> firstFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(firstFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> firstFired = Iterables.getOnlyElement(firstFiredTimers);
-    assertThat(firstFired.getTimers(), contains(earliestTimer));
-
-    clock.set(new Instant(50_000L));
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.emptyList(),
-        new Instant(50_000L));
-    manager.refreshAll();
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(secondFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> secondFired =
-        Iterables.getOnlyElement(secondFiredTimers);
-    // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
-  }
-
-  @Test
-  public void processingTimeTimersCanBeReset() {
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> initialTimers =
-        manager.extractFiredTimers();
-    assertThat(initialTimers, emptyIterable());
-
-    String timerId = "myTimer";
-    StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
-
-    TimerData initialTimer =
-        TimerData.of(timerId, StateNamespaces.global(), new Instant(5000L),
-            TimeDomain.PROCESSING_TIME);
-
-    TimerData overridingTimer =
-        TimerData.of(timerId, StateNamespaces.global(), new Instant(10000L),
-            TimeDomain.PROCESSING_TIME);
-
-    TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
-    TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
-
-    manager.updateWatermarks(
-        null,
-        initialUpdate,
-        graph.getProducer(createdInts),
-        null,
-        Collections.emptyList(),
-        new Instant(5000L));
-    manager.refreshAll();
-
-    // This update should override the previous timer.
-    manager.updateWatermarks(
-        null,
-        overridingUpdate,
-        graph.getProducer(createdInts),
-        null,
-        Collections.emptyList(),
-        new Instant(10000L));
-
-    // Set clock past the timers.
-    clock.set(new Instant(50000L));
-    manager.refreshAll();
-
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> firedTimers =
-        manager.extractFiredTimers();
-    assertThat(firedTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> timers = Iterables.getOnlyElement(firedTimers);
-    assertThat(timers.getTimers(), contains(overridingTimer));
-  }
-
-  @Test
-  public void eventTimeTimersCanBeReset() {
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> initialTimers =
-        manager.extractFiredTimers();
-    assertThat(initialTimers, emptyIterable());
-
-    String timerId = "myTimer";
-    StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
-
-    TimerData initialTimer = TimerData
-        .of(timerId, StateNamespaces.global(), new Instant(1000L), TimeDomain.EVENT_TIME);
-    TimerData overridingTimer = TimerData
-        .of(timerId, StateNamespaces.global(), new Instant(2000L), TimeDomain.EVENT_TIME);
-
-    TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
-    TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
-
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(
-        createdBundle,
-        initialUpdate,
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-    manager.refreshAll();
-
-    // This update should override the previous timer.
-    manager.updateWatermarks(
-        createdBundle,
-        overridingUpdate,
-        graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-    manager.refreshAll();
-
-    // Set WM past the timers.
-    manager.updateWatermarks(
-        null,
-        TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        null,
-        Collections.singleton(createdBundle),
-        new Instant(3000L));
-    manager.refreshAll();
-
-    Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> firstFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(firstFiredTimers, not(Matchers.emptyIterable()));
-    FiredTimers<AppliedPTransform<?, ?, ?>> firstFired = Iterables.getOnlyElement(firstFiredTimers);
-    assertThat(firstFired.getTimers(), contains(overridingTimer));
-  }
-
-  @Test
-  public void inputWatermarkDuplicates() {
-    Watermark mockWatermark = Mockito.mock(Watermark.class);
-
-    AppliedPTransformInputWatermark underTest =
-        new AppliedPTransformInputWatermark(ImmutableList.of(mockWatermark));
-
-    // Refresh
-    when(mockWatermark.get()).thenReturn(new Instant(0));
-    underTest.refresh();
-    assertEquals(new Instant(0), underTest.get());
-
-    // Apply a timer update
-    StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
-    TimerData timer1 = TimerData
-        .of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME);
-    TimerData timer2 = TimerData
-        .of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME);
-    underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());
-
-    // Only the last timer update should be observable
-    assertEquals(timer2.getTimestamp(), underTest.getEarliestTimerTimestamp());
-
-    // Advance the input watermark
-    when(mockWatermark.get()).thenReturn(new Instant(1000));
-    underTest.refresh();
-    assertEquals(new Instant(1000), underTest.get()); // input watermark is not held by timers
-
-    // Examine the fired event time timers
-    Map<StructuralKey<?>, List<TimerData>> fired = underTest.extractFiredEventTimeTimers();
-    List<TimerData> timers = fired.get(key);
-    assertNotNull(timers);
-    assertThat(timers, contains(timer2));
-
-    // Update based on timer firings
-    underTest.updateTimers(TimerUpdate.builder(key)
-        .withCompletedTimers(timers).build());
-
-    // Now we should be able to advance
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, underTest.getEarliestTimerTimestamp());
-
-    // Nothing left to fire
-    fired = underTest.extractFiredEventTimeTimers();
-    assertThat(fired.entrySet(), empty());
-  }
-
-  @Test
-  public void timerUpdateBuilderBuildAddsAllAddedTimers() {
-    TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
-    TimerData deleted =
-        TimerData.of(StateNamespaces.global(), new Instant(24L), TimeDomain.PROCESSING_TIME);
-    TimerData completedOne = TimerData.of(
-        StateNamespaces.global(), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData completedTwo =
-        TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME);
-
-    TimerUpdate update =
-        TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of()))
-            .withCompletedTimers(ImmutableList.of(completedOne, completedTwo))
-            .setTimer(set)
-            .deletedTimer(deleted)
-            .build();
-
-    assertThat(update.getCompletedTimers(), containsInAnyOrder(completedOne, completedTwo));
-    assertThat(update.getSetTimers(), contains(set));
-    assertThat(update.getDeletedTimers(), contains(deleted));
-  }
-
-  @Test
-  public void timerUpdateBuilderWithSetAtEndOfTime() {
-    Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME);
-
-    TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty());
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(timerStamp.toString());
-    builder.setTimer(tooFar);
-  }
-
-  @Test
-  public void timerUpdateBuilderWithSetPastEndOfTime() {
-    Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2));
-    TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME);
-
-    TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty());
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(timerStamp.toString());
-    builder.setTimer(tooFar);
-  }
-
-  @Test
-  public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build();
-
-    assertThat(built.getSetTimers(), emptyIterable());
-    assertThat(built.getDeletedTimers(), contains(timer));
-  }
-
-  @Test
-  public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build();
-
-    assertThat(built.getSetTimers(), contains(timer));
-    assertThat(built.getDeletedTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    builder.setTimer(timer);
-    assertThat(built.getSetTimers(), emptyIterable());
-    builder.build();
-    assertThat(built.getSetTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    builder.deletedTimer(timer);
-    assertThat(built.getDeletedTimers(), emptyIterable());
-    builder.build();
-    assertThat(built.getDeletedTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    builder.withCompletedTimers(ImmutableList.of(timer));
-    assertThat(built.getCompletedTimers(), emptyIterable());
-    builder.build();
-    assertThat(built.getCompletedTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateWithCompletedTimersNotAddedToExisting() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    assertThat(built.getCompletedTimers(), emptyIterable());
-    assertThat(
-        built.withCompletedTimers(ImmutableList.of(timer)).getCompletedTimers(), contains(timer));
-    assertThat(built.getCompletedTimers(), emptyIterable());
-  }
-
-  @SafeVarargs
-  private final <T> CommittedBundle<T> timestampedBundle(
-      PCollection<T> pc, TimestampedValue<T>... values) {
-    UncommittedBundle<T> bundle = bundleFactory.createBundle(pc);
-    for (TimestampedValue<T> value : values) {
-      bundle.add(
-          WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp()));
-    }
-    return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-  }
-
-  @SafeVarargs
-  private final <T> CommittedBundle<T> multiWindowedBundle(PCollection<T> pc, T... values) {
-    UncommittedBundle<T> bundle = bundleFactory.createBundle(pc);
-    Collection<BoundedWindow> windows =
-        ImmutableList.of(
-            GlobalWindow.INSTANCE,
-            new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0)));
-    for (T value : values) {
-      bundle.add(
-          WindowedValue.of(value, BoundedWindow.TIMESTAMP_MIN_VALUE, windows, PaneInfo.NO_FIRING));
-    }
-    return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-  }
-}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactoryTest.java
index e569b7f8162..bf13f3b9553 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/WindowEvaluatorFactoryTest.java
@@ -29,28 +29,27 @@
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Collections;
-import org.apache.beam.runners.direct.DirectGraphs;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -60,10 +59,11 @@
 
 /** Tests for {@link WindowEvaluatorFactory}. */
 @RunWith(JUnit4.class)
+@Ignore("TODO BEAM-4241 Not yet migrated")
 public class WindowEvaluatorFactoryTest {
   private static final Instant EPOCH = new Instant(0);
 
-  private PCollection<Long> input;
+  private PCollectionNode input;
   private WindowEvaluatorFactory factory;
 
   @Mock private EvaluationContext evaluationContext;
@@ -98,7 +98,9 @@
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    input = p.apply(Create.of(1L, 2L, 3L));
+    input =
+        PipelineNode.pCollection(
+            "created", RunnerApi.PCollection.newBuilder().setUniqueName("created").build());
 
     bundleFactory = ImmutableListBundleFactory.create();
     factory = new WindowEvaluatorFactory(evaluationContext);
@@ -107,17 +109,15 @@ public void setup() {
   @Test
   public void singleWindowFnSucceeds() throws Exception {
     Duration windowDuration = Duration.standardDays(7);
-    Window<Long> transform = Window.into(FixedWindows.of(windowDuration));
-    PCollection<Long> windowed = input.apply(transform);
-
     CommittedBundle<Long> inputBundle = createInputBundle();
 
-    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+    PCollectionNode windowed = null;
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed);
 
     BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
     BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
+    TransformResult<Long> result = runEvaluator(inputBundle);
 
     assertThat(Iterables.getOnlyElement(result.getOutputBundles()), Matchers.equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
@@ -144,13 +144,13 @@ public void singleWindowFnSucceeds() throws Exception {
   public void multipleWindowsWindowFnSucceeds() throws Exception {
     Duration windowDuration = Duration.standardDays(6);
     Duration slidingBy = Duration.standardDays(3);
-    Window<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
-    PCollection<Long> windowed = input.apply(transform);
 
     CommittedBundle<Long> inputBundle = createInputBundle();
-    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
+    PCollectionNode windowed = null;
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed);
+
+    TransformResult<Long> result = runEvaluator(inputBundle);
 
     assertThat(Iterables.getOnlyElement(result.getOutputBundles()), Matchers.equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
@@ -199,13 +199,12 @@ public void multipleWindowsWindowFnSucceeds() throws Exception {
 
   @Test
   public void referencesEarlierWindowsSucceeds() throws Exception {
-    Window<Long> transform = Window.into(new EvaluatorTestWindowFn());
-    PCollection<Long> windowed = input.apply(transform);
-
     CommittedBundle<Long> inputBundle = createInputBundle();
-    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
+    PCollectionNode windowed = null;
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed);
+
+    TransformResult<Long> result = runEvaluator(inputBundle);
 
     assertThat(Iterables.getOnlyElement(result.getOutputBundles()), Matchers.equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
@@ -252,7 +251,7 @@ public void referencesEarlierWindowsSucceeds() throws Exception {
   private CommittedBundle<Long> createInputBundle() {
     CommittedBundle<Long> inputBundle =
         bundleFactory
-            .createBundle(input)
+            .<Long>createBundle(input)
             .add(valueInGlobalWindow)
             .add(valueInGlobalAndTwoIntervalWindows)
             .add(valueInIntervalWindow)
@@ -260,23 +259,21 @@ public void referencesEarlierWindowsSucceeds() throws Exception {
     return inputBundle;
   }
 
-  private UncommittedBundle<Long> createOutputBundle(
-      PCollection<Long> output, CommittedBundle<Long> inputBundle) {
+  private UncommittedBundle<Long> createOutputBundle(PCollectionNode output) {
     UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(output);
-    when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
-    return outputBundle;
+    when(evaluationContext.<Long>createBundle(output)).thenReturn(outputBundle);
+    throw new UnsupportedOperationException("Not yet migrated");
   }
 
-  private TransformResult<Long> runEvaluator(
-      PCollection<Long> windowed, CommittedBundle<Long> inputBundle) throws Exception {
-    TransformEvaluator<Long> evaluator =
-        factory.forApplication(DirectGraphs.getProducer(windowed), inputBundle);
+  private TransformResult<Long> runEvaluator(CommittedBundle<Long> inputBundle) throws Exception {
+    PTransformNode window = null;
+    TransformEvaluator<Long> evaluator = factory.forApplication(window, inputBundle);
 
     evaluator.processElement(valueInGlobalWindow);
     evaluator.processElement(valueInGlobalAndTwoIntervalWindows);
     evaluator.processElement(valueInIntervalWindow);
     TransformResult<Long> result = evaluator.finishBundle();
-    return result;
+    throw new UnsupportedOperationException("Not yet migrated");
   }
 
   private static class EvaluatorTestWindowFn extends NonMergingWindowFn<Long, BoundedWindow> {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 99784)
    Time Spent: 4h 10m  (was: 4h)

> The DirectRunner should interact with a Pipeline via an abstraction of the Graph rather than SDK types
> ------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4073
>                 URL: https://issues.apache.org/jira/browse/BEAM-4073
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message