beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4135) Remove Use of Java SDK Types in the DirectRunner "engine"
Date Fri, 20 Apr 2018 03:57:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4135?focusedWorklogId=93061&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93061 ]

ASF GitHub Bot logged work on BEAM-4135:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/18 03:56
            Start Date: 20/Apr/18 03:56
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5177: [BEAM-4135] Stop taking the whole result in WatermarkManager
URL: https://github.com/apache/beam/pull/5177
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 8f0dd423125..bfa65cd2d8e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,7 +31,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -141,7 +140,7 @@ public void initialize(
    * @return the committed bundles contained within the handled {@code result}
    */
   public CommittedResult<AppliedPTransform<?, ?, ?>> handleResult(
-      @Nullable CommittedBundle<?> completedBundle,
+      CommittedBundle<?> completedBundle,
       Iterable<TimerData> completedTimers,
       TransformResult<?> result) {
     Iterable<? extends CommittedBundle<?>> committedBundles =
@@ -162,9 +161,7 @@ public void initialize(
     CopyOnAccessInMemoryStateInternals theirState = result.getState();
     if (theirState != null) {
       CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
-      StepAndKey stepAndKey =
-          StepAndKey.of(
-              result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
+      StepAndKey stepAndKey = StepAndKey.of(result.getTransform(), completedBundle.getKey());
       if (!committedState.isEmpty()) {
         applicationStateInternals.put(stepAndKey, committedState);
       } else {
@@ -176,7 +173,9 @@ public void initialize(
     watermarkManager.updateWatermarks(
         completedBundle,
         result.getTimerUpdate().withCompletedTimers(completedTimers),
-        committedResult,
+        committedResult.getExecutable(),
+        committedResult.getUnprocessedInputs().orNull(),
+        committedResult.getOutputs(),
         result.getWatermarkHold());
     return committedResult;
   }
@@ -188,7 +187,7 @@ public void initialize(
    * {@link Optional}.
    */
   private Optional<? extends CommittedBundle<?>> getUnprocessedInput(
-      @Nullable CommittedBundle<?> completedBundle, TransformResult<?> result) {
+      CommittedBundle<?> completedBundle, TransformResult<?> result) {
     if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) {
       return Optional.absent();
     }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index 9ada00e5c8f..e3632697164 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -162,12 +162,12 @@ private void fireTimers() {
                     transformTimers.getKey(),
                     (PCollection)
                         Iterables.getOnlyElement(
-                            transformTimers.getTransform().getInputs().values()))
+                            transformTimers.getExecutable().getInputs().values()))
                 .add(WindowedValue.valueInGlobalWindow(work))
                 .commit(evaluationContext.now());
         outstandingWork.incrementAndGet();
         bundleProcessor.process(
-            bundle, transformTimers.getTransform(), new TimerIterableCompletionCallback(delivery));
+            bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
         state.set(ExecutorState.ACTIVE);
       }
     } catch (Exception e) {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 882bdc5cbc9..86e904655ce 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -74,9 +74,9 @@
  * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to
  * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time.
  *
- * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the
+ * <p>Whenever a root {@link AppliedPTransform executable} produces elements, the
  * {@link WatermarkManager} is provided with the produced elements and the output watermark
- * of the producing {@link AppliedPTransform transform}. The
+ * of the producing {@link AppliedPTransform executable}. The
  * {@link WatermarkManager watermark manager} is responsible for computing the watermarks
  * of all {@link AppliedPTransform transforms} that consume one or more
  * {@link PCollection PCollections}.
@@ -813,35 +813,35 @@ private TransformWatermarks getValueWatermark(CollectionT value) {
     return getTransformWatermark(graph.getProducer(value));
   }
 
-  private TransformWatermarks getTransformWatermark(ExecutableT transform) {
-    TransformWatermarks wms = transformToWatermarks.get(transform);
+  private TransformWatermarks getTransformWatermark(ExecutableT executable) {
+    TransformWatermarks wms = transformToWatermarks.get(executable);
     if (wms == null) {
-      List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform);
+      List<Watermark> inputCollectionWatermarks = getInputWatermarks(executable);
       AppliedPTransformInputWatermark inputWatermark =
           new AppliedPTransformInputWatermark(inputCollectionWatermarks);
       AppliedPTransformOutputWatermark outputWatermark =
           new AppliedPTransformOutputWatermark(inputWatermark);
 
       SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
-          new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
+          new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(executable));
       SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
           new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
 
       wms =
           new TransformWatermarks(
-              transform,
+              executable,
               inputWatermark,
               outputWatermark,
               inputProcessingWatermark,
               outputProcessingWatermark);
-      transformToWatermarks.put(transform, wms);
+      transformToWatermarks.put(executable, wms);
     }
     return wms;
   }
 
-  private Collection<Watermark> getInputProcessingWatermarks(ExecutableT transform) {
+  private Collection<Watermark> getInputProcessingWatermarks(ExecutableT executable) {
     ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    Collection<CollectionT> inputs = graph.getPerElementInputs(transform);
+    Collection<CollectionT> inputs = graph.getPerElementInputs(executable);
     if (inputs.isEmpty()) {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
@@ -853,9 +853,9 @@ private TransformWatermarks getTransformWatermark(ExecutableT transform) {
     return inputWmsBuilder.build();
   }
 
-  private List<Watermark> getInputWatermarks(ExecutableT transform) {
+  private List<Watermark> getInputWatermarks(ExecutableT executable) {
     ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    Collection<CollectionT> inputs = graph.getPerElementInputs(transform);
+    Collection<CollectionT> inputs = graph.getPerElementInputs(executable);
     if (inputs.isEmpty()) {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
@@ -873,10 +873,10 @@ private TransformWatermarks getTransformWatermark(ExecutableT transform) {
    * AppliedPTransform PTransform} has not processed any elements, return a watermark of {@link
    * BoundedWindow#TIMESTAMP_MIN_VALUE}.
    *
-   * @return a snapshot of the input watermark and output watermark for the provided transform
+   * @return a snapshot of the input watermark and output watermark for the provided executable
    */
-  public TransformWatermarks getWatermarks(ExecutableT transform) {
-    return transformToWatermarks.get(transform);
+  public TransformWatermarks getWatermarks(ExecutableT executable) {
+    return transformToWatermarks.get(executable);
   }
 
   public void initialize(Map<ExecutableT, ? extends Iterable<CommittedBundle<?>>> initialBundles) {
@@ -896,9 +896,9 @@ public void initialize(Map<ExecutableT, ? extends Iterable<CommittedBundle<?>>>
   }
 
   /**
-   * Updates the watermarks of a transform with one or more inputs.
+   * Updates the watermarks of a executable with one or more inputs.
    *
-   * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can,
+   * <p>Each executable has two monotonically increasing watermarks: the input watermark, which can,
    * at any time, be updated to equal:
    * <pre>
    * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks))
@@ -908,24 +908,27 @@ public void initialize(Map<ExecutableT, ? extends Iterable<CommittedBundle<?>>>
    * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
    * </pre>.
    *
+   * <p>Updates to watermarks may not be immediately visible.
+   *
    * @param completed the input that has completed
-   * @param timerUpdate the timers that were added, removed, and completed as part of producing
-   *                    this update
-   * @param result the result that was produced by processing the input
-   * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
-   *                     is no hold
+   * @param timerUpdate the timers that were added, removed, and completed as part of producing this
+   *     update
+   * @param executable the executable applied to {@code completed} to produce the outputs
+   * @param unprocessedInputs inputs that could not be processed
+   * @param outputs outputs that were produced by the application of the {@code executable} to the
+   *     input
+   * @param earliestHold the earliest watermark hold in the executable's state.
    */
   public void updateWatermarks(
       @Nullable CommittedBundle<?> completed,
       TimerUpdate timerUpdate,
-      CommittedResult<ExecutableT> result,
+      ExecutableT executable,
+      @Nullable CommittedBundle<?> unprocessedInputs,
+      Iterable<? extends CommittedBundle<?>> outputs,
       Instant earliestHold) {
-    pendingUpdates.offer(PendingWatermarkUpdate.create(
-        result.getExecutable(),
-        completed,
-        timerUpdate,
-        result,
-        earliestHold));
+    pendingUpdates.offer(
+        PendingWatermarkUpdate.create(
+            executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold));
     tryApplyPendingUpdates();
   }
 
@@ -952,10 +955,10 @@ private void applyAllPendingUpdates() {
     }
   }
 
-  @GuardedBy("refreshLock")
   /**
    * Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive.
    */
+  @GuardedBy("refreshLock")
   private void applyNUpdates(int numUpdates) {
     for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) {
       PendingWatermarkUpdate<ExecutableT> pending = pendingUpdates.poll();
@@ -964,38 +967,48 @@ private void applyNUpdates(int numUpdates) {
     }
   }
 
+  /** Apply a {@link PendingWatermarkUpdate} to the {@link WatermarkManager}. */
   private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT> pending) {
-    CommittedResult<ExecutableT> result = pending.getResult();
-    ExecutableT transform = result.getExecutable();
+    ExecutableT executable = pending.getExecutable();
     CommittedBundle<?> inputBundle = pending.getInputBundle();
 
-    updatePending(inputBundle, pending.getTimerUpdate(), result);
+    updatePending(
+        inputBundle,
+        pending.getTimerUpdate(),
+        executable,
+        pending.getUnprocessedInputs(),
+        pending.getOutputs());
 
-    TransformWatermarks transformWms = transformToWatermarks.get(transform);
-    transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(),
-        pending.getEarliestHold());
+    TransformWatermarks transformWms = transformToWatermarks.get(executable);
+    transformWms.setEventTimeHold(
+        inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
   }
 
   /**
    * First adds all produced elements to the queue of pending elements for each consumer, then adds
    * all pending timers to the collection of pending timers, then removes all completed and deleted
    * timers from the collection of pending timers, then removes all completed elements from the
-   * pending queue of the transform.
+   * pending queue of the executable.
    *
    * <p>It is required that all newly pending elements are added to the queue of pending elements
    * for each consumer prior to the completed elements being removed, as doing otherwise could cause
    * a Watermark to appear in a state in which the upstream (completed) element does not hold the
    * watermark but the element it produced is not yet pending. This can cause the watermark to
    * erroneously advance.
+   *
+   * <p>See {@link #updateWatermarks(CommittedBundle, TimerUpdate, Object, CommittedBundle,
+   * Iterable, Instant)} for information about the parameters of this method.
    */
   private void updatePending(
       CommittedBundle<?> input,
       TimerUpdate timerUpdate,
-      CommittedResult<ExecutableT> result) {
+      ExecutableT executable,
+      @Nullable CommittedBundle<?> unprocessedInputs,
+      Iterable<? extends CommittedBundle<?>> outputs) {
     // Newly pending elements must be added before completed elements are removed, as the two
     // do not share a Mutex within this call and thus can be interleaved with external calls to
     // refresh.
-    for (CommittedBundle<?> bundle : result.getOutputs()) {
+    for (CommittedBundle<?> bundle : outputs) {
       for (ExecutableT consumer :
           // TODO: Remove this cast once CommittedBundle returns a CollectionT
           graph.getPerElementConsumers((CollectionT) bundle.getPCollection())) {
@@ -1004,10 +1017,10 @@ private void updatePending(
       }
     }
 
-    TransformWatermarks completedTransform = transformToWatermarks.get(result.getExecutable());
-    if (result.getUnprocessedInputs().isPresent()) {
+    TransformWatermarks completedTransform = transformToWatermarks.get(executable);
+    if (unprocessedInputs != null) {
       // Add the unprocessed inputs
-      completedTransform.addPending(result.getUnprocessedInputs().get());
+      completedTransform.addPending(unprocessedInputs);
     }
     completedTransform.updateTimers(timerUpdate);
     if (input != null) {
@@ -1034,8 +1047,8 @@ synchronized void refreshAll() {
 
   private Set<ExecutableT> refreshAllOf(Set<ExecutableT> toRefresh) {
     Set<ExecutableT> newRefreshes = new HashSet<>();
-    for (ExecutableT transform : toRefresh) {
-      newRefreshes.addAll(refreshWatermarks(transform));
+    for (ExecutableT executable : toRefresh) {
+      newRefreshes.addAll(refreshWatermarks(executable));
     }
     return newRefreshes;
   }
@@ -1179,7 +1192,7 @@ public void removeHold(Object key) {
    * A reference to the input and output watermarks of an {@link AppliedPTransform}.
    */
   public class TransformWatermarks {
-    private final ExecutableT transform;
+    private final ExecutableT executable;
 
     private final AppliedPTransformInputWatermark inputWatermark;
     private final AppliedPTransformOutputWatermark outputWatermark;
@@ -1191,12 +1204,12 @@ public void removeHold(Object key) {
     private Instant latestSynchronizedOutputWm;
 
     private TransformWatermarks(
-        ExecutableT transform,
+        ExecutableT executable,
         AppliedPTransformInputWatermark inputWatermark,
         AppliedPTransformOutputWatermark outputWatermark,
         SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
         SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
-      this.transform = transform;
+      this.executable = executable;
       this.inputWatermark = inputWatermark;
       this.outputWatermark = outputWatermark;
 
@@ -1284,7 +1297,7 @@ private void addPending(CommittedBundle<?> bundle) {
       for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers :
           timersPerKey.entrySet()) {
         keyFiredTimers.add(
-            new FiredTimers<>(transform, firedTimers.getKey(), firedTimers.getValue()));
+            new FiredTimers<>(executable, firedTimers.getKey(), firedTimers.getValue()));
       }
       return keyFiredTimers;
     }
@@ -1470,26 +1483,26 @@ public boolean equals(Object other) {
 
   /**
    * A pair of {@link TimerData} and key which can be delivered to the appropriate
-   * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when
+   * {@link AppliedPTransform}. A timer fires at the executable that set it with a specific key when
    * the time domain in which it lives progresses past a specified time, as determined by the
    * {@link WatermarkManager}.
    */
   public static class FiredTimers<ExecutableT> {
-    /** The transform the timers were set at and will be delivered to. */
-    private final ExecutableT transform;
+    /** The executable the timers were set at and will be delivered to. */
+    private final ExecutableT executable;
     /** The key the timers were set for and will be delivered to. */
     private final StructuralKey<?> key;
     private final Collection<TimerData> timers;
 
     private FiredTimers(
-        ExecutableT transform, StructuralKey<?> key, Collection<TimerData> timers) {
-      this.transform = transform;
+        ExecutableT executable, StructuralKey<?> key, Collection<TimerData> timers) {
+      this.executable = executable;
       this.key = key;
       this.timers = timers;
     }
 
-    public ExecutableT getTransform() {
-      return transform;
+    public ExecutableT getExecutable() {
+      return executable;
     }
 
     public StructuralKey<?> getKey() {
@@ -1522,17 +1535,6 @@ public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
     }
   }
 
-  public Set<ExecutableT> getCompletedTransforms() {
-    Set<ExecutableT> result = new HashSet<>();
-    for (Map.Entry<ExecutableT, TransformWatermarks> wms :
-        transformToWatermarks.entrySet()) {
-      if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
-        result.add(wms.getKey());
-      }
-    }
-    return result;
-  }
-
   @AutoValue
   abstract static class PendingWatermarkUpdate<ExecutableT> {
     abstract ExecutableT getExecutable();
@@ -1542,18 +1544,22 @@ public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
 
     abstract TimerUpdate getTimerUpdate();
 
-    abstract CommittedResult<ExecutableT> getResult();
+    @Nullable
+    abstract CommittedBundle<?> getUnprocessedInputs();
+
+    abstract Iterable<? extends CommittedBundle<?>> getOutputs();
 
     abstract Instant getEarliestHold();
 
     public static <ExecutableT> PendingWatermarkUpdate<ExecutableT> create(
         ExecutableT executable,
-        CommittedBundle<?> inputBundle,
+        @Nullable CommittedBundle<?> inputBundle,
         TimerUpdate timerUpdate,
-        CommittedResult<ExecutableT> result,
+        @Nullable CommittedBundle<?> unprocessedInputs,
+        Iterable<? extends CommittedBundle<?>> outputs,
         Instant earliestHold) {
-      return new AutoValue_WatermarkManager_PendingWatermarkUpdate(
-          executable, inputBundle, timerUpdate, result, earliestHold);
+      return new AutoValue_WatermarkManager_PendingWatermarkUpdate<>(
+          executable, inputBundle, timerUpdate, unprocessedInputs, outputs, earliestHold);
     }
   }
 }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 21dac7fbb51..03764c1a1ba 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -28,20 +28,16 @@
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
@@ -145,8 +141,7 @@ public void processElement(ProcessContext c) throws Exception {
    */
   @Test
   public void getWatermarkForUntouchedTransform() {
-    TransformWatermarks watermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
 
     assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
     assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -159,11 +154,12 @@ public void getWatermarkForUntouchedTransform() {
   @Test
   public void getWatermarkForUpdatedSourceTransform() {
     CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(output)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.singleton(output),
         new Instant(8000L));
     manager.refreshAll();
     TransformWatermarks updatedSourceWatermark =
@@ -180,11 +176,12 @@ public void getWatermarkForUpdatedSourceTransform() {
   public void getWatermarkForMultiInputTransform() {
     CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1);
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(intsToFlatten),
-            null,
-            Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
+        graph.getProducer(intsToFlatten),
+        null,
+        Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
@@ -217,20 +214,18 @@ public void getWatermarkForMultiInputTransform() {
     manager.updateWatermarks(
         secondPcollectionBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(flattened),
-            secondPcollectionBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
+        graph.getProducer(flattened),
+        secondPcollectionBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks transformAfterProcessing =
         manager.getWatermarks(graph.getProducer(flattened));
     manager.updateWatermarks(
         secondPcollectionBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(flattened),
-            secondPcollectionBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
+        graph.getProducer(flattened),
+        secondPcollectionBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     assertThat(
@@ -245,11 +240,12 @@ public void getWatermarkForMultiInputTransform() {
         timestampedBundle(createdInts, TimestampedValue.of(5, firstCollectionTimestamp));
     // the source is done, but elements are still buffered. The source output watermark should be
     // past the end of the global window
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks firstSourceWatermarks =
@@ -280,10 +276,9 @@ public void getWatermarkForMultiInputTransform() {
     manager.updateWatermarks(
         firstPcollectionBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(flattened),
-            firstPcollectionBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
+        graph.getProducer(flattened),
+        firstPcollectionBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks afterConsumingAllInput =
@@ -332,11 +327,9 @@ public void getWatermarkMultiIdenticalInput() {
     tstMgr.updateWatermarks(
         root,
         TimerUpdate.empty(),
-        CommittedResult.create(
-            StepTransformResult.withoutHold(graph.getProducer(created)).build(),
-            Optional.absent(),
-            Collections.singleton(createBundle),
-            EnumSet.allOf(OutputType.class)),
+        graph.getProducer(created),
+        null,
+        Collections.singleton(createBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     tstMgr.refreshAll();
@@ -346,11 +339,9 @@ public void getWatermarkMultiIdenticalInput() {
     tstMgr.updateWatermarks(
         createBundle,
         TimerUpdate.empty(),
-        CommittedResult.create(
-            StepTransformResult.withoutHold(theFlatten).build(),
-            Optional.absent(),
-            Collections.emptyList(),
-            EnumSet.allOf(OutputType.class)),
+        theFlatten,
+        null,
+        Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     tstMgr.refreshAll();
@@ -359,11 +350,9 @@ public void getWatermarkMultiIdenticalInput() {
     tstMgr.updateWatermarks(
         createBundle,
         TimerUpdate.empty(),
-        CommittedResult.create(
-            StepTransformResult.withoutHold(theFlatten).build(),
-            Optional.absent(),
-            Collections.emptyList(),
-            EnumSet.allOf(OutputType.class)),
+        theFlatten,
+        null,
+        Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     tstMgr.refreshAll();
     assertThat(flattenWms.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -378,11 +367,12 @@ public void getWatermarkForMultiConsumedCollection() {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
         TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
         TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createdBundle),
         new Instant(Long.MAX_VALUE));
     manager.refreshAll();
     TransformWatermarks createdAfterProducing =
@@ -398,10 +388,9 @@ public void getWatermarkForMultiConsumedCollection() {
     manager.updateWatermarks(
         createdBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(keyed),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+        graph.getProducer(keyed),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(keyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks keyedWatermarks =
@@ -421,10 +410,9 @@ public void getWatermarkForMultiConsumedCollection() {
     manager.updateWatermarks(
         createdBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks filteredProcessedWatermarks =
@@ -447,11 +435,12 @@ public void updateWatermarkWithWatermarkHolds() {
         TimestampedValue.of(1, new Instant(1_000_000L)),
         TimestampedValue.of(2, new Instant(1234L)),
         TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createdBundle),
         new Instant(Long.MAX_VALUE));
 
     CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
@@ -461,10 +450,9 @@ public void updateWatermarkWithWatermarkHolds() {
     manager.updateWatermarks(
         createdBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(keyed),
             createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+            Collections.<CommittedBundle<?>>singleton(keyBundle),
         new Instant(500L));
     manager.refreshAll();
     TransformWatermarks keyedWatermarks =
@@ -493,28 +481,27 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
         .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
         .commit(clock.now());
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            ImmutableList.of(firstKeyBundle, secondKeyBundle)),
+        graph.getProducer(createdInts),
+        null,
+        ImmutableList.of(firstKeyBundle, secondKeyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(
         firstKeyBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(filtered),
             firstKeyBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+            Collections.emptyList(),
         new Instant(-1000L));
     manager.updateWatermarks(
         secondKeyBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(filtered),
             secondKeyBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+            Collections.emptyList(),
         new Instant(1234L));
     manager.refreshAll();
 
@@ -530,10 +517,9 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
     manager.updateWatermarks(
         fauxFirstKeyTimerBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(filtered),
             fauxFirstKeyTimerBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+            Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
@@ -545,10 +531,9 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
     manager.updateWatermarks(
         fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(filtered),
             fauxSecondKeyTimerBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+            Collections.emptyList(),
         new Instant(5678L));
     manager.refreshAll();
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
@@ -556,10 +541,9 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
     manager.updateWatermarks(
         fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(filtered),
             fauxSecondKeyTimerBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+            Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     assertThat(filteredWatermarks.getOutputWatermark(),
@@ -574,10 +558,12 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
   public void updateOutputWatermarkShouldBeMonotonic() {
     CommittedBundle<?> firstInput =
         bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(null,  TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(firstInput)),
+    manager.updateWatermarks(
+        null,
+        TimerUpdate.empty(),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(firstInput),
         new Instant(0L));
     manager.refreshAll();
     TransformWatermarks firstWatermarks =
@@ -588,9 +574,9 @@ public void updateOutputWatermarkShouldBeMonotonic() {
         bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
+        graph.getProducer(createdInts),
             null,
-            Collections.<CommittedBundle<?>>singleton(secondInput)),
+            Collections.<CommittedBundle<?>>singleton(secondInput),
         new Instant(-250L));
     manager.refreshAll();
     TransformWatermarks secondWatermarks =
@@ -610,9 +596,9 @@ public void updateWatermarkWithHoldsShouldBeMonotonic() {
         TimestampedValue.of(3, new Instant(-1000L)));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
+        graph.getProducer(createdInts),
             null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+            Collections.<CommittedBundle<?>>singleton(createdBundle),
         new Instant(Long.MAX_VALUE));
 
     CommittedBundle<KV<String, Integer>> keyBundle =
@@ -622,10 +608,9 @@ public void updateWatermarkWithHoldsShouldBeMonotonic() {
     manager.updateWatermarks(
         createdBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(keyed),
             createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+            Collections.<CommittedBundle<?>>singleton(keyBundle),
         new Instant(500L));
     manager.refreshAll();
     TransformWatermarks keyedWatermarks =
@@ -657,20 +642,22 @@ public void updateWatermarkWithUnprocessedElements() {
         .add(third)
         .commit(clock.now());
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createdBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
         TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
-    manager.updateWatermarks(createdBundle,
+    manager.updateWatermarks(
+        createdBundle,
         TimerUpdate.empty(),
-        result(graph.getProducer(keyed),
-            createdBundle.withElements(ImmutableList.of(second, third)),
-            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+        graph.getProducer(keyed),
+        createdBundle.withElements(ImmutableList.of(second, third)),
+        Collections.<CommittedBundle<?>>singleton(keyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(graph.getProducer(keyed));
@@ -692,20 +679,20 @@ public void updateWatermarkWithCompletedElementsNotPending() {
         .add(second)
         .commit(clock.now());
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createdBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(
         neverCreatedBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(filtered),
             neverCreatedBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+            Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.refreshAll();
@@ -723,11 +710,12 @@ public void updateWatermarkWithLateData() {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
         TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L)));
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createdBundle),
         sourceWatermark);
 
     CommittedBundle<KV<String, Integer>> keyBundle =
@@ -738,10 +726,9 @@ public void updateWatermarkWithLateData() {
     manager.updateWatermarks(
         createdBundle,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(keyed),
             createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+            Collections.<CommittedBundle<?>>singleton(keyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks onTimeWatermarks =
@@ -756,10 +743,9 @@ public void updateWatermarkWithLateData() {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        result(
             graph.getProducer(createdInts),
             createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
+            Collections.<CommittedBundle<?>>singleton(lateDataBundle),
         new Instant(2_000_000L));
     manager.refreshAll();
     TransformWatermarks bufferedLateWm =
@@ -778,10 +764,9 @@ public void updateWatermarkWithLateData() {
     manager.updateWatermarks(
         lateDataBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(keyed),
-            lateDataBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
+        graph.getProducer(keyed),
+        lateDataBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(lateKeyedBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
   }
@@ -789,12 +774,14 @@ public void updateWatermarkWithLateData() {
   public void updateWatermarkWithDifferentWindowedValueInstances() {
     manager.updateWatermarks(
         null,
-        TimerUpdate.empty(), result(graph.getProducer(createdInts), null,
+        TimerUpdate.empty(),
+        graph.getProducer(createdInts),
+        null,
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
                 .createBundle(createdInts)
                 .add(WindowedValue.valueInGlobalWindow(1))
-                .commit(Instant.now()))),
+                .commit(Instant.now())),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts)
@@ -803,10 +790,9 @@ public void updateWatermarkWithDifferentWindowedValueInstances() {
     manager.updateWatermarks(
         createdBundle,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(keyed),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+        graph.getProducer(keyed),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.emptyList(),
         null);
     manager.refreshAll();
     TransformWatermarks onTimeWatermarks =
@@ -821,11 +807,12 @@ public void updateWatermarkWithDifferentWindowedValueInstances() {
   @Test
   public void getWatermarksAfterOnlyEmptyOutput() {
     CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks updatedSourceWatermarks =
@@ -852,21 +839,21 @@ public void getWatermarksAfterOnlyEmptyOutput() {
   @Test
   public void getWatermarksAfterHoldAndEmptyOutput() {
     CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(firstCreateOutput),
         new Instant(12_000L));
 
     CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
     manager.updateWatermarks(
         firstCreateOutput,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(filtered),
-            firstCreateOutput.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
+        graph.getProducer(filtered),
+        firstCreateOutput.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
         new Instant(10_000L));
     manager.refreshAll();
     TransformWatermarks firstFilterWatermarks =
@@ -875,11 +862,12 @@ public void getWatermarksAfterHoldAndEmptyOutput() {
     assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
 
     CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks updatedSourceWatermarks =
@@ -919,11 +907,12 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
     CommittedBundle<Integer> createOutput =
         bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createOutput)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks createAfterUpdate =
@@ -953,10 +942,9 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
     manager.updateWatermarks(
         createOutput,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(filtered),
-            createOutput.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
+        graph.getProducer(filtered),
+        createOutput.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks filterAfterConsumed =
@@ -978,11 +966,12 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
   //  @Test
   public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createdBundle),
         new Instant(1248L));
     manager.refreshAll();
 
@@ -1004,10 +993,9 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     manager.updateWatermarks(
         createdBundle,
         timers,
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     Instant startTime = clock.now();
@@ -1041,10 +1029,9 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     manager.updateWatermarks(
         filteredTimerBundle,
         TimerUpdate.builder(key).withCompletedTimers(Collections.singleton(pastTimer)).build(),
-        result(
-            graph.getProducer(filtered),
-            filteredTimerBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
+        graph.getProducer(filtered),
+        filteredTimerBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
@@ -1058,10 +1045,9 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     manager.updateWatermarks(
         filteredTimerResult,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(filteredTimesTwo),
-            filteredTimerResult.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+        graph.getProducer(filteredTimesTwo),
+        filteredTimerResult.withElements(Collections.emptyList()),
+        Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -1096,26 +1082,27 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
     CommittedBundle<Integer> createOutput =
         bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
 
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createOutput)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    TransformWatermarks createAfterUpdate =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(),
-        not(laterThan(clock.now())));
+    assertThat(
+        createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
 
     CommittedBundle<Integer> createSecondOutput =
         bundleFactory.createBundle(createdInts).commit(new Instant(750L));
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(createSecondOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
@@ -1125,11 +1112,12 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
   @Test
   public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
     CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(created)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(created),
         new Instant(40_900L));
     manager.refreshAll();
 
@@ -1142,15 +1130,13 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers(
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .setTimer(upstreamProcessingTimer)
             .build(),
-        result(
-            graph.getProducer(filtered),
-            created.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+        graph.getProducer(filtered),
+        created.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
-    TransformWatermarks downstreamWms =
-        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
 
     clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1167,10 +1153,9 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers(
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
             .build(),
-        result(
-            graph.getProducer(filtered),
-            otherCreated.withElements(Collections.emptyList()),
-            Collections.emptyList()),
+        graph.getProducer(filtered),
+        otherCreated.withElements(Collections.emptyList()),
+        Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
@@ -1183,9 +1168,9 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.<CommittedBundle<?>>singleton(created)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.<CommittedBundle<?>>singleton(created),
         new Instant(29_919_235L));
 
     Instant upstreamHold = new Instant(2048L);
@@ -1195,15 +1180,13 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
     manager.updateWatermarks(
         created,
         TimerUpdate.empty(),
-        result(
-            graph.getProducer(filtered),
-            created.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+        graph.getProducer(filtered),
+        created.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
-    TransformWatermarks downstreamWms =
-        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
 
     clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1219,11 +1202,12 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() {
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.singleton(createdBundle),
         new Instant(1500L));
     manager.refreshAll();
 
@@ -1244,10 +1228,9 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() {
     manager.updateWatermarks(
         createdBundle,
         update,
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1260,7 +1243,9 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts), null, Collections.emptyList()),
+        graph.getProducer(createdInts),
+        null,
+        Collections.emptyList(),
         new Instant(50_000L));
     manager.refreshAll();
     Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
@@ -1281,11 +1266,12 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.singleton(createdBundle),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -1305,10 +1291,9 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
     manager.updateWatermarks(
         createdBundle,
         update,
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1322,7 +1307,9 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts), null, Collections.emptyList()),
+        graph.getProducer(createdInts),
+        null,
+        Collections.emptyList(),
         new Instant(50_000L));
     manager.refreshAll();
     Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
@@ -1343,11 +1330,12 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.singleton(createdBundle),
         new Instant(1500L));
 
     TimerData earliestTimer = TimerData.of(
@@ -1367,10 +1355,9 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
     manager.updateWatermarks(
         createdBundle,
         update,
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1384,7 +1371,9 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts), null, Collections.emptyList()),
+        graph.getProducer(createdInts),
+        null,
+        Collections.emptyList(),
         new Instant(50_000L));
     manager.refreshAll();
     Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
@@ -1419,7 +1408,9 @@ public void processingTimeTimersCanBeReset() {
     manager.updateWatermarks(
         null,
         initialUpdate,
-        result(graph.getProducer(createdInts), null, Collections.emptyList()),
+        graph.getProducer(createdInts),
+        null,
+        Collections.emptyList(),
         new Instant(5000L));
     manager.refreshAll();
 
@@ -1427,7 +1418,9 @@ public void processingTimeTimersCanBeReset() {
     manager.updateWatermarks(
         null,
         overridingUpdate,
-        result(graph.getProducer(createdInts), null, Collections.emptyList()),
+        graph.getProducer(createdInts),
+        null,
+        Collections.emptyList(),
         new Instant(10000L));
 
     // Set clock past the timers.
@@ -1462,10 +1455,9 @@ public void eventTimeTimersCanBeReset() {
     manager.updateWatermarks(
         createdBundle,
         initialUpdate,
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1473,19 +1465,19 @@ public void eventTimeTimersCanBeReset() {
     manager.updateWatermarks(
         createdBundle,
         overridingUpdate,
-        result(
-            graph.getProducer(filtered),
-            createdBundle.withElements(Collections.emptyList()),
-            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+        graph.getProducer(filtered),
+        createdBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
     // Set WM past the timers.
-    manager.updateWatermarks(null,
+    manager.updateWatermarks(
+        null,
         TimerUpdate.empty(),
-        result(graph.getProducer(createdInts),
-            null,
-            Collections.singleton(createdBundle)),
+        graph.getProducer(createdInts),
+        null,
+        Collections.singleton(createdBundle),
         new Instant(3000L));
     manager.refreshAll();
 
@@ -1709,23 +1701,4 @@ public void describeTo(Description description) {
     }
     return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
-
-  private CommittedResult<AppliedPTransform<?, ?, ?>> result(
-      AppliedPTransform<?, ?, ?> transform,
-      @Nullable CommittedBundle<?> unprocessedBundle,
-      Iterable<? extends CommittedBundle<?>> bundles) {
-    Optional<? extends CommittedBundle<?>> unprocessedElements;
-    if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) {
-      unprocessedElements = Optional.absent();
-    } else {
-      unprocessedElements = Optional.of(unprocessedBundle);
-    }
-    return CommittedResult.create(
-        StepTransformResult.withoutHold(transform).build(),
-        unprocessedElements,
-        bundles,
-        Iterables.isEmpty(bundles)
-            ? EnumSet.noneOf(OutputType.class)
-            : EnumSet.of(OutputType.BUNDLE));
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 93061)
    Time Spent: 1h 50m  (was: 1h 40m)

> Remove Use of Java SDK Types in the DirectRunner "engine"
> ---------------------------------------------------------
>
>                 Key: BEAM-4135
>                 URL: https://issues.apache.org/jira/browse/BEAM-4135
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-direct
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The "engine" consists of the components which determine where to schedule work and route it to the appropriate processors, such as WatermarkManager, DirectBundleProcessor, and associated.
>  
> These engine components never inspect the actual characteristics of the packaged work (e.g. the PCollection is a token, rather than a rich object), so they should not require use of a PCollection directly - instead, they can be generic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message