beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] beam git commit: [BEAM-1689] Apply changes for Flink's StatefulDoFnRunner to the primary StatefulDoFnRunner
Date Tue, 14 Mar 2017 10:55:34 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9ad01fb15 -> e362e6b49


[BEAM-1689] Apply changes for Flink's StatefulDoFnRunner to the primary StatefulDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7621472
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7621472
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7621472

Branch: refs/heads/master
Commit: f7621472826c82b355f9affb0bebfeef4f550004
Parents: 9ad01fb
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Mar 13 10:20:13 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Mar 14 11:51:33 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/core/StatefulDoFnRunner.java   |  96 ++++++++++++++++
 .../runners/core/StatefulDoFnRunnerTest.java    | 113 ++++---------------
 .../wrappers/streaming/DoFnOperator.java        |  95 +---------------
 .../flink/streaming/DoFnOperatorTest.java       |   9 +-
 4 files changed, 126 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index d27193c..4f15822 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -17,8 +17,12 @@
  */
 package org.apache.beam.runners.core;
 
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -26,6 +30,8 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateSpec;
 import org.joda.time.Instant;
 
 /**
@@ -168,4 +174,94 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
     void clearForWindow(W window);
   }
+
+  /**
+   * A {@link StatefulDoFnRunner.CleanupTimer} implemented via {@link TimerInternals}.
+   */
+  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer
{
+
+    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+    /**
+     * The amount of milliseconds by which to delay cleanup. We use this to ensure that state
is
+     * still available when a user timer for {@code window.maxTimestamp()} fires.
+     */
+    public static final long GC_DELAY_MS = 1;
+
+    private final TimerInternals timerInternals;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final Coder<BoundedWindow> windowCoder;
+
+    public TimeInternalsCleanupTimer(
+        TimerInternals timerInternals,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+      this.timerInternals = timerInternals;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public void setForWindow(BoundedWindow window) {
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      // make sure this fires after any window.maxTimestamp() timers
+      gcTime = gcTime.plus(GC_DELAY_MS);
+      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      gcTime = gcTime.plus(GC_DELAY_MS);
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
+  }
+
+  /**
+   * A {@link StatefulDoFnRunner.StateCleaner} implemented via {@link StateInternals}.
+   */
+  public static class StateInternalsStateCleaner<W extends BoundedWindow>
+      implements StatefulDoFnRunner.StateCleaner<W> {
+
+    private final DoFn<?, ?> fn;
+    private final DoFnSignature signature;
+    private final StateInternals<?> stateInternals;
+    private final Coder<W> windowCoder;
+
+    public StateInternalsStateCleaner(
+        DoFn<?, ?> fn,
+        StateInternals<?> stateInternals,
+        Coder<W> windowCoder) {
+      this.fn = fn;
+      this.signature = DoFnSignatures.getSignature(fn.getClass());
+      this.stateInternals = stateInternals;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void clearForWindow(W window) {
+      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+          signature.stateDeclarations().entrySet()) {
+        try {
+          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+          state.clear();
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index fd6a73c..46cbd7d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
-import java.util.Map;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -32,18 +31,13 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -124,8 +118,8 @@ public class StatefulDoFnRunnerTest {
         mockStepContext,
         aggregatorFactory,
         WINDOWING_STRATEGY,
-        new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
-        new StateInternalsStateCleaner<>(
+        new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+        new StatefulDoFnRunner.StateInternalsStateCleaner<>(
             fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
 
     runner.startBundle();
@@ -153,8 +147,8 @@ public class StatefulDoFnRunnerTest {
         mockStepContext,
         aggregatorFactory,
         WINDOWING_STRATEGY,
-        new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
-        new StateInternalsStateCleaner<>(
+        new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+        new StatefulDoFnRunner.StateInternalsStateCleaner<>(
             fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
 
     Instant elementTime = new Instant(1);
@@ -179,8 +173,16 @@ public class StatefulDoFnRunnerTest {
         2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
 
     // advance watermark past end of WINDOW_1 + allowed lateness
+    // the cleanup timer is set to window.maxTimestamp() + allowed lateness + 1
+    // to ensure that state is still available when a user timer for window.maxTimestamp()
fires
     advanceInputWatermark(
-        timerInternals, WINDOW_1.maxTimestamp().plus(ALLOWED_LATENESS + 1), runner);
+        timerInternals,
+        WINDOW_1.maxTimestamp()
+            .plus(ALLOWED_LATENESS)
+            .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
+            .plus(1), // so the watermark is past the GC horizon, not on it
+        runner);
+
     assertTrue(
         stateInternals.isEmptyForTesting(
             stateInternals.state(windowNamespace(WINDOW_1), stateTag)));
@@ -190,7 +192,13 @@ public class StatefulDoFnRunnerTest {
 
     // advance watermark past end of WINDOW_2 + allowed lateness
     advanceInputWatermark(
-        timerInternals, WINDOW_2.maxTimestamp().plus(ALLOWED_LATENESS + 1), runner);
+        timerInternals,
+        WINDOW_2.maxTimestamp()
+            .plus(ALLOWED_LATENESS)
+            .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
+            .plus(1), // so the watermark is past the GC horizon, not on it
+        runner);
+
     assertTrue(
         stateInternals.isEmptyForTesting(
             stateInternals.state(windowNamespace(WINDOW_2), stateTag)));
@@ -263,85 +271,4 @@ public class StatefulDoFnRunnerTest {
       return Sum.ofLongs();
     }
   }
-
-  /**
-   * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
-   */
-  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer
{
-
-    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
-
-    private final TimerInternals timerInternals;
-    private final WindowingStrategy<?, ?> windowingStrategy;
-    private final Coder<BoundedWindow> windowCoder;
-
-    public TimeInternalsCleanupTimer(
-        TimerInternals timerInternals,
-        WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
-      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
-      this.timerInternals = timerInternals;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-
-    @Override
-    public void setForWindow(BoundedWindow window) {
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
-          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public boolean isForWindow(
-        String timerId,
-        BoundedWindow window,
-        Instant timestamp,
-        TimeDomain timeDomain) {
-      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
-    }
-  }
-
-  /**
-   * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
-   */
-  public static class StateInternalsStateCleaner<W extends BoundedWindow>
-      implements StatefulDoFnRunner.StateCleaner<W> {
-
-    private final DoFn<?, ?> fn;
-    private final DoFnSignature signature;
-    private final StateInternals<?> stateInternals;
-    private final Coder<W> windowCoder;
-
-    public StateInternalsStateCleaner(
-        DoFn<?, ?> fn,
-        StateInternals<?> stateInternals,
-        Coder<W> windowCoder) {
-      this.fn = fn;
-      this.signature = DoFnSignatures.getSignature(fn.getClass());
-      this.stateInternals = stateInternals;
-      this.windowCoder = windowCoder;
-    }
-
-    @Override
-    public void clearForWindow(W window) {
-      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
-          signature.stateDeclarations().entrySet()) {
-        try {
-          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
-          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
-              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
-          state.clear();
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a8ce680..9a66a2f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -62,18 +62,13 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -302,7 +297,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       // It is a stateful DoFn
 
       StatefulDoFnRunner.CleanupTimer cleanupTimer =
-          new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
+          new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+              stepContext.timerInternals(), windowingStrategy);
 
       // we don't know the window type
       @SuppressWarnings({"unchecked", "rawtypes"})
@@ -310,7 +306,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
       @SuppressWarnings({"unchecked", "rawtypes"})
       StatefulDoFnRunner.StateCleaner<?> stateCleaner =
-          new StateInternalsStateCleaner<>(
+          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
               doFn, stepContext.stateInternals(), windowCoder);
 
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
@@ -773,89 +769,4 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       return new Instant(currentOutputWatermark);
     }
   }
-
-
-  /**
-   * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
-   */
-  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer
{
-
-    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
-
-    private final TimerInternals timerInternals;
-    private final WindowingStrategy<?, ?> windowingStrategy;
-    private final Coder<BoundedWindow> windowCoder;
-
-    public TimeInternalsCleanupTimer(
-        TimerInternals timerInternals,
-        WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
-      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
-      this.timerInternals = timerInternals;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-
-    @Override
-    public void setForWindow(BoundedWindow window) {
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      // make sure this fires after any window.maxTimestamp() timers
-      gcTime = gcTime.plus(1L);
-      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
-          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public boolean isForWindow(
-        String timerId,
-        BoundedWindow window,
-        Instant timestamp,
-        TimeDomain timeDomain) {
-      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      gcTime = gcTime.plus(1L);
-      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
-    }
-  }
-
-  /**
-   * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
-   */
-  public static class StateInternalsStateCleaner<W extends BoundedWindow>
-      implements StatefulDoFnRunner.StateCleaner<W> {
-
-    private final DoFn<?, ?> fn;
-    private final DoFnSignature signature;
-    private final StateInternals<?> stateInternals;
-    private final Coder<W> windowCoder;
-
-    public StateInternalsStateCleaner(
-        DoFn<?, ?> fn,
-        StateInternals<?> stateInternals,
-        Coder<W> windowCoder) {
-      this.fn = fn;
-      this.signature = DoFnSignatures.getSignature(fn.getClass());
-      this.stateInternals = stateInternals;
-      this.windowCoder = windowCoder;
-    }
-
-    @Override
-    public void clearForWindow(W window) {
-      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
-          signature.stateDeclarations().entrySet()) {
-        try {
-          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
-          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
-              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
-          state.clear();
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index bbd3428..25154fa 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -276,7 +277,7 @@ public class DoFnOperatorTest {
   public void testStateGCForStatefulFn() throws Exception {
 
     WindowingStrategy<Object, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+        WindowingStrategy.of(FixedWindows.of(new Duration(10))).withAllowedLateness(Duration.ZERO);
 
     final String timerId = "boo";
     final String stateId = "dazzle";
@@ -378,7 +379,11 @@ public class DoFnOperatorTest {
 
     // this should trigger both the window.maxTimestamp() timer and the GC timer
     // this tests that the GC timer fires after the user timer
-    testHarness.processWatermark(15);
+    testHarness.processWatermark(
+        window1.maxTimestamp()
+            .plus(windowingStrategy.getAllowedLateness())
+            .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
+            .getMillis());
 
     assertThat(
         this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),


Mime
View raw message