beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Input watermarks can never be null.
Date Fri, 25 Mar 2016 20:48:43 GMT
Input watermarks can never be null.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34b33014
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34b33014
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34b33014

Branch: refs/heads/master
Commit: 34b3301413a16be6697c5d20896f0f85ddc65cf1
Parents: 1c89a1b
Author: Mark Shields <markshields@google.com>
Authored: Fri Mar 4 15:16:33 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Mar 25 13:37:18 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InMemoryWatermarkManager.java       |  3 ++-
 .../inprocess/InProcessTimerInternals.java        |  1 -
 .../sdk/util/LateDataDroppingDoFnRunner.java      |  3 +--
 .../cloud/dataflow/sdk/util/PaneInfoTracker.java  |  2 +-
 .../dataflow/sdk/util/ReduceFnContextFactory.java |  1 -
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java   | 18 +++++++++---------
 .../cloud/dataflow/sdk/util/TimerInternals.java   |  4 ++--
 .../google/cloud/dataflow/sdk/util/Timers.java    |  3 +--
 .../dataflow/sdk/util/TriggerContextFactory.java  |  1 -
 .../cloud/dataflow/sdk/util/WatermarkHold.java    |  9 +++------
 .../cloud/dataflow/sdk/io/CountingInputTest.java  |  1 +
 .../cloud/dataflow/sdk/util/ReduceFnTester.java   |  8 +++-----
 .../cloud/dataflow/sdk/util/TriggerTester.java    | 10 ++++------
 13 files changed, 27 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index a9a62a6..c4d67db 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -994,7 +995,7 @@ public class InMemoryWatermarkManager {
      * Returns the input watermark of the {@link AppliedPTransform}.
      */
     public Instant getInputWatermark() {
-      return inputWatermark.get();
+      return Preconditions.checkNotNull(inputWatermark.get());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
index 06ba7b8..1d075c5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
@@ -70,7 +70,6 @@ public class InProcessTimerInternals implements TimerInternals {
   }
 
   @Override
-  @Nullable
   public Instant currentInputWatermarkTime() {
     return watermarks.getInputWatermark();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
index 31927ab..3dfa064 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
@@ -138,8 +138,7 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends
BoundedWin
     /** Is {@code window} expired w.r.t. the garbage collection watermark? */
     private boolean canDropDueToExpiredWindow(BoundedWindow window) {
       Instant inputWM = timerInternals.currentInputWatermarkTime();
-      return inputWM != null
-          && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
index a7818a3..9fa36b0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
@@ -103,7 +103,7 @@ public class PaneInfoTracker {
     boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
 
     // True is the input watermark hasn't passed the window's max timestamp.
-    boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
+    boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
 
     Timing timing;
     if (isLateForOutput || !onlyEarlyPanesSoFar) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
index bdbaf10..7649d52 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
@@ -146,7 +146,6 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow>
{
     }
 
     @Override
-    @Nullable
     public Instant currentEventTime() {
       return timerInternals.currentInputWatermarkTime();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index f1d4582..560d8ec 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -537,6 +537,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
           "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer,
window);
     }
 
+    // If this is an end-of-window timer then, we need to set a GC timer
+    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+      && timer.getTimestamp().equals(window.maxTimestamp());
+
     // If this is a garbage collection timer then we should trigger and garbage collect the
window.
     Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     boolean isGarbageCollection =
@@ -553,7 +557,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
         // We need to call onTrigger to emit the final pane if required.
         // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
         // and the watermark has passed the end of the window.
-        onTrigger(directContext, renamedContext, true/* isFinished */);
+        onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
       }
 
       // Cleanup flavor B: Clear all the remaining state for this window since we'll never
@@ -569,9 +573,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
         emitIfAppropriate(directContext, renamedContext);
       }
 
-      // If this is an end-of-window timer then, we need to set a GC timer
-      boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-          && timer.getTimestamp().equals(window.maxTimestamp());
       if (isEndOfWindow) {
         // If the window strategy trigger includes a watermark trigger then at this point
         // there should be no data holds, either because we'd already cleared them on an
@@ -676,7 +677,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     // Run onTrigger to produce the actual pane contents.
     // As a side effect it will clear all element holds, but not necessarily any
     // end-of-window or garbage collection holds.
-    onTrigger(directContext, renamedContext, isFinished);
+    onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
 
     // Now that we've triggered, the pane is empty.
     nonEmptyPanes.clearPane(renamedContext.state());
@@ -723,10 +724,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
   private void onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean isFinished)
+      boolean isFinished, boolean isEndOfWindow)
           throws Exception {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Preconditions.checkNotNull(inputWM);
 
     // Prefetch necessary states
     ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
@@ -747,7 +747,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
-    if (newHold != null && inputWM != null) {
+    if (newHold != null) {
       // We can't be finished yet.
       Preconditions.checkState(
         !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
@@ -825,7 +825,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     Instant endOfWindow = directContext.window().maxTimestamp();
     Instant fireTime;
     String which;
-    if (inputWM != null && endOfWindow.isBefore(inputWM)) {
+    if (endOfWindow.isBefore(inputWM)) {
       fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
       which = "garbage collection";
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
index c823ed3..b26e6e8 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
@@ -79,10 +79,11 @@ public interface TimerInternals {
 
   /**
    * Return the current, local input watermark timestamp for this computation
-   * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
+   * in the {@link TimeDomain#EVENT_TIME} time domain.
    *
    * <p>This value:
    * <ol>
+   * <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
    * <li>Is monotonically increasing.
    * <li>May differ between workers due to network and other delays.
    * <li>Will never be ahead of the global input watermark for this computation. But
it
@@ -95,7 +96,6 @@ public interface TimerInternals {
    * it is possible for an element to be considered locally on-time even though it is
    * globally late.
    */
-  @Nullable
   Instant currentInputWatermarkTime();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
index 7d4b4f2..2ddf524 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
@@ -54,7 +54,6 @@ public interface Timers {
   @Nullable
   public abstract Instant currentSynchronizedProcessingTime();
 
-  /** Returns the current event time or {@code null} if unknown. */
-  @Nullable
+  /** Returns the current event time. */
   public abstract Instant currentEventTime();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
index 64ff402..50e8b32 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
@@ -209,7 +209,6 @@ public class TriggerContextFactory<W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentEventTime() {
       return timers.currentEventTime();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
index 31e36c5..7f814c4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
@@ -228,7 +228,6 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
 
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Preconditions.checkNotNull(inputWM);
 
     // Only add the hold if we can be sure:
     // - the backend will be able to respect it
@@ -242,7 +241,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     if (outputWM != null && elementHold.isBefore(outputWM)) {
       which = "too late to effect output watermark";
       tooLate = true;
-    } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM))
{
+    } else if (context.window().maxTimestamp().isBefore(inputWM)) {
       which = "too late for end-of-window timer";
       tooLate = true;
     } else {
@@ -288,12 +287,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     // by the end of window (ie the end of window is at or ahead of the input watermark).
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Preconditions.checkNotNull(inputWM);
 
     String which;
     boolean tooLate;
     Instant eowHold = context.window().maxTimestamp();
-    if (inputWM != null && eowHold.isBefore(inputWM)) {
+    if (eowHold.isBefore(inputWM)) {
       which = "too late for end-of-window timer";
       tooLate = true;
     } else {
@@ -332,13 +330,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
       Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
       Instant outputWM = timerInternals.currentOutputWatermarkTime();
       Instant inputWM = timerInternals.currentInputWatermarkTime();
-      Preconditions.checkNotNull(inputWM);
 
       WindowTracing.trace(
           "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
           + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
           gcHold, context.key(), context.window(), inputWM, outputWM);
-      Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM),
+      Preconditions.checkState(!gcHold.isBefore(inputWM),
           "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM);
       context.state().access(EXTRA_HOLD_TAG).add(gcHold);
       return gcHold;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
index 1daadc7..cc60953 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -1,3 +1,4 @@
+
 /*
  * Copyright (C) 2016 Google Inc.
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
index d4620a7..4aeaa0c 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
@@ -599,7 +599,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
 
     /** Current input watermark. */
     @Nullable
-    private Instant inputWatermarkTime = null;
+    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
     /** Current output watermark. */
     @Nullable
@@ -666,9 +666,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
     }
 
     @Override
-    @Nullable
     public Instant currentInputWatermarkTime() {
-      return inputWatermarkTime;
+      return Preconditions.checkNotNull(inputWatermarkTime);
     }
 
     @Override
@@ -692,7 +691,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
         ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception
{
       Preconditions.checkNotNull(newInputWatermark);
       Preconditions.checkState(
-          inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime),
+          !newInputWatermark.isBefore(inputWatermarkTime),
           "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
           newInputWatermark);
       WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
@@ -713,7 +712,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
 
     public void advanceOutputWatermark(Instant newOutputWatermark) {
       Preconditions.checkNotNull(newOutputWatermark);
-      Preconditions.checkNotNull(inputWatermarkTime);
       if (newOutputWatermark.isAfter(inputWatermarkTime)) {
         WindowTracing.trace(
             "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {}
to {}",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
index 0c71830..f291438 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
@@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.util.state.StateTag;
 import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
 import com.google.cloud.dataflow.sdk.values.TimestampedValue;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -428,7 +429,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
 
     /** Current input watermark. */
     @Nullable
-    private Instant inputWatermarkTime = null;
+    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
     /** Current output watermark. */
     @Nullable
@@ -471,9 +472,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentInputWatermarkTime() {
-      return inputWatermarkTime;
+      return Preconditions.checkNotNull(inputWatermarkTime);
     }
 
     @Override
@@ -495,7 +495,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
 
     public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
       checkNotNull(newInputWatermark);
-      checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime),
+      checkState(!newInputWatermark.isBefore(inputWatermarkTime),
           "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
           newInputWatermark);
       WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
@@ -513,7 +513,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
 
     private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception {
       checkNotNull(newOutputWatermark);
-      checkNotNull(inputWatermarkTime);
       if (newOutputWatermark.isAfter(inputWatermarkTime)) {
         WindowTracing.trace(
             "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {}
to {}",
@@ -577,7 +576,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentEventTime() {
       return timerInternals.currentInputWatermarkTime();
     }


Mime
View raw message