beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/4] beam git commit: Introduce Flink-specific state GC implementations
Date Fri, 10 Mar 2017 21:02:19 GMT
Repository: beam
Updated Branches:
  refs/heads/master 54390a333 -> 75fe559a4


Introduce Flink-specific state GC implementations

We now set the GC timer for window.maxTimestamp() + 1 to ensure that a
user timer set for window.maxTimestamp() still has all state.

This also adds tests for late data dropping and state GC specifically
for the Flink DoFnOperator.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a8e1f74
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a8e1f74
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a8e1f74

Branch: refs/heads/master
Commit: 1a8e1f7463cbc7c6b5edfe1dbbc98502e5612511
Parents: bf6d274
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Mar 10 11:07:00 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Mar 10 11:09:04 2017 +0100

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   |  15 +-
 .../beam/runners/core/StatefulDoFnRunner.java   |  87 -------
 .../runners/core/StatefulDoFnRunnerTest.java    | 110 ++++++++-
 .../wrappers/streaming/DoFnOperator.java        | 111 ++++++++-
 .../flink/streaming/DoFnOperatorTest.java       | 225 +++++++++++++++++++
 5 files changed, 439 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a8e1f74/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 9455eea..a1b7c8b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -21,9 +21,6 @@ import java.util.List;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
-import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
-import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -135,18 +132,13 @@ public class DoFnRunners {
           DoFnRunner<InputT, OutputT> doFnRunner,
           StepContext stepContext,
           AggregatorFactory aggregatorFactory,
-          WindowingStrategy<?, ?> windowingStrategy) {
+          WindowingStrategy<?, ?> windowingStrategy,
+          CleanupTimer cleanupTimer,
+          StateCleaner<W> stateCleaner) {
     Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
         fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
         Sum.ofLongs());
 
-    CleanupTimer cleanupTimer =
-        new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
-
-    Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder();
-    StateCleaner<W> stateCleaner =
-        new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder);
-
     return new StatefulDoFnRunner<>(
         doFnRunner,
         windowingStrategy,
@@ -154,5 +146,4 @@ public class DoFnRunners {
         stateCleaner,
         droppedDueToLateness);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a8e1f74/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 926345e..c672902 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -17,12 +17,8 @@
  */
 package org.apache.beam.runners.core;
 
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -30,8 +26,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.joda.time.Instant;
 
 /**
@@ -45,7 +39,6 @@ import org.joda.time.Instant;
 public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
     implements DoFnRunner<InputT, OutputT> {
 
-  public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
 
   private final DoFnRunner<InputT, OutputT> doFnRunner;
@@ -167,84 +160,4 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
     void clearForWindow(W window);
   }
-
-  /**
-   * A {@link CleanupTimer} implemented by TimerInternals.
-   */
-  public static class TimeInternalsCleanupTimer implements CleanupTimer {
-
-    private final TimerInternals timerInternals;
-    private final WindowingStrategy<?, ?> windowingStrategy;
-    private final Coder<BoundedWindow> windowCoder;
-
-    public TimeInternalsCleanupTimer(
-        TimerInternals timerInternals,
-        WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
-      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
-      this.timerInternals = timerInternals;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-
-    @Override
-    public void setForWindow(BoundedWindow window) {
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
-          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public boolean isForWindow(
-        String timerId,
-        BoundedWindow window,
-        Instant timestamp,
-        TimeDomain timeDomain) {
-      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
-    }
-  }
-
-  /**
-   * A {@link StateCleaner} implemented by StateInternals.
-   */
-  public static class StateInternalsStateCleaner<W extends BoundedWindow>
-      implements StateCleaner<W> {
-
-    private final DoFn<?, ?> fn;
-    private final DoFnSignature signature;
-    private final StateInternals<?> stateInternals;
-    private final Coder<W> windowCoder;
-
-    public StateInternalsStateCleaner(
-        DoFn<?, ?> fn,
-        StateInternals<?> stateInternals,
-        Coder<W> windowCoder) {
-      this.fn = fn;
-      this.signature = DoFnSignatures.getSignature(fn.getClass());
-      this.stateInternals = stateInternals;
-      this.windowCoder = windowCoder;
-    }
-
-    @Override
-    public void clearForWindow(W window) {
-      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
-          signature.stateDeclarations().entrySet()) {
-        try {
-          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
-          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
-              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
-          state.clear();
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a8e1f74/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 54ac77e..fd6a73c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
+import java.util.Map;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -31,14 +32,18 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -114,7 +119,14 @@ public class StatefulDoFnRunnerTest {
     DoFn<KV<String, Integer>, Integer> fn = new MyDoFn();
 
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
-        fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY);
+        fn,
+        getDoFnRunner(fn),
+        mockStepContext,
+        aggregatorFactory,
+        WINDOWING_STRATEGY,
+        new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+        new StateInternalsStateCleaner<>(
+            fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
 
     runner.startBundle();
 
@@ -125,13 +137,6 @@ public class StatefulDoFnRunnerTest {
         WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
     assertEquals(1L, droppedDueToLateness.sum);
 
-    runner.onTimer("processTimer", window, timestamp, TimeDomain.PROCESSING_TIME);
-    assertEquals(2L, droppedDueToLateness.sum);
-
-    runner.onTimer("synchronizedProcessTimer", window, timestamp,
-        TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    assertEquals(3L, droppedDueToLateness.sum);
-
     runner.finishBundle();
   }
 
@@ -143,7 +148,14 @@ public class StatefulDoFnRunnerTest {
     StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId,
fn.intState);
 
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
-        fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY);
+        fn,
+        getDoFnRunner(fn),
+        mockStepContext,
+        aggregatorFactory,
+        WINDOWING_STRATEGY,
+        new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+        new StateInternalsStateCleaner<>(
+            fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
 
     Instant elementTime = new Instant(1);
 
@@ -252,4 +264,84 @@ public class StatefulDoFnRunnerTest {
     }
   }
 
+  /**
+   * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
+   */
+  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer
{
+
+    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+    private final TimerInternals timerInternals;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final Coder<BoundedWindow> windowCoder;
+
+    public TimeInternalsCleanupTimer(
+        TimerInternals timerInternals,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+      this.timerInternals = timerInternals;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public void setForWindow(BoundedWindow window) {
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
+  }
+
+  /**
+   * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
+   */
+  public static class StateInternalsStateCleaner<W extends BoundedWindow>
+      implements StatefulDoFnRunner.StateCleaner<W> {
+
+    private final DoFn<?, ?> fn;
+    private final DoFnSignature signature;
+    private final StateInternals<?> stateInternals;
+    private final Coder<W> windowCoder;
+
+    public StateInternalsStateCleaner(
+        DoFn<?, ?> fn,
+        StateInternals<?> stateInternals,
+        Coder<W> windowCoder) {
+      this.fn = fn;
+      this.signature = DoFnSignatures.getSignature(fn.getClass());
+      this.stateInternals = stateInternals;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void clearForWindow(W window) {
+      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+          signature.stateDeclarations().entrySet()) {
+        try {
+          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+          state.clear();
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a8e1f74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c4622ba..a8ce680 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
@@ -61,13 +62,18 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -286,6 +292,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       //
       // for some K, V
 
+
       doFnRunner = DoFnRunners.lateDataDroppingRunner(
           (DoFnRunner) doFnRunner,
           stepContext,
@@ -293,8 +300,27 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
           ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
     } else if (keyCoder != null) {
       // It is a stateful DoFn
+
+      StatefulDoFnRunner.CleanupTimer cleanupTimer =
+          new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
+
+      // we don't know the window type
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StateInternalsStateCleaner<>(
+              doFn, stepContext.stateInternals(), windowCoder);
+
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
-          doFn, doFnRunner, stepContext, aggregatorFactory, windowingStrategy);
+          doFn,
+          doFnRunner,
+          stepContext,
+          aggregatorFactory,
+          windowingStrategy,
+          cleanupTimer,
+          stateCleaner);
     }
 
     pushbackDoFnRunner =
@@ -746,7 +772,90 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     public Instant currentOutputWatermarkTime() {
       return new Instant(currentOutputWatermark);
     }
+  }
+
+
+  /**
+   * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
+   */
+  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer
{
+
+    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+    private final TimerInternals timerInternals;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final Coder<BoundedWindow> windowCoder;
+
+    public TimeInternalsCleanupTimer(
+        TimerInternals timerInternals,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+      this.timerInternals = timerInternals;
+    }
 
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public void setForWindow(BoundedWindow window) {
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      // make sure this fires after any window.maxTimestamp() timers
+      gcTime = gcTime.plus(1L);
+      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      gcTime = gcTime.plus(1L);
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
   }
 
+  /**
+   * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
+   */
+  public static class StateInternalsStateCleaner<W extends BoundedWindow>
+      implements StatefulDoFnRunner.StateCleaner<W> {
+
+    private final DoFn<?, ?> fn;
+    private final DoFnSignature signature;
+    private final StateInternals<?> stateInternals;
+    private final Coder<W> windowCoder;
+
+    public StateInternalsStateCleaner(
+        DoFn<?, ?> fn,
+        StateInternals<?> stateInternals,
+        Coder<W> windowCoder) {
+      this.fn = fn;
+      this.signature = DoFnSignatures.getSignature(fn.getClass());
+      this.stateInternals = stateInternals;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void clearForWindow(W window) {
+      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+          signature.stateDeclarations().entrySet()) {
+        try {
+          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+          state.clear();
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a8e1f74/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 7d14a87..bbd3428 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Function;
@@ -29,9 +31,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -40,14 +45,23 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
@@ -169,6 +183,217 @@ public class DoFnOperatorTest {
     testHarness.close();
   }
 
+  @Test
+  public void testLateDroppingForStatefulFn() throws Exception {
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+
+    DoFn<Integer, String> fn = new DoFn<Integer, String>() {
+
+      @StateId("state")
+      private final StateSpec<Object, ValueState<String>> stateSpec =
+          StateSpecs.value(StringUtf8Coder.of());
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        context.output(context.element().toString());
+      }
+    };
+
+    WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            VarIntCoder.of(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
+        fn,
+        windowedValueCoder,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(),
+        windowingStrategy,
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        VarIntCoder.of() /* key coder */);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>>
testHarness =
+        new KeyedOneInputStreamOperatorTestHarness<>(
+            doFnOperator,
+            new KeySelector<WindowedValue<Integer>, Integer>() {
+              @Override
+              public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws
Exception {
+                return integerWindowedValue.getValue();
+              }
+            },
+            new CoderTypeInformation<>(VarIntCoder.of()));
+
+    testHarness.open();
+
+    testHarness.processWatermark(0);
+
+    IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
+
+    // this should not be late
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.of("13", new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.getOutput().clear();
+
+    testHarness.processWatermark(9);
+
+    // this should still not be considered late
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.of("17", new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.getOutput().clear();
+
+    testHarness.processWatermark(10);
+
+    // this should now be considered late
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        emptyIterable());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testStateGCForStatefulFn() throws Exception {
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+
+    final String timerId = "boo";
+    final String stateId = "dazzle";
+
+    final int offset = 5000;
+    final int timerOutput = 4093;
+
+    DoFn<KV<String, Integer>, KV<String, Integer>> fn =
+        new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<String>> stateSpec =
+              StateSpecs.value(StringUtf8Coder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext context,
+              @TimerId(timerId) Timer timer,
+              @StateId(stateId) ValueState<String> state,
+              BoundedWindow window) {
+            timer.set(window.maxTimestamp());
+            state.write(context.element().getKey());
+            context.output(
+                KV.of(context.element().getKey(), context.element().getValue() + offset));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String>
state) {
+            context.output(KV.of(state.read(), timerOutput));
+          }
+        };
+
+    WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder
=
+        WindowedValue.getFullCoder(
+            KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<
+        KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String,
Integer>>> doFnOperator =
+        new DoFnOperator<>(
+            fn,
+            windowedValueCoder,
+            outputTag,
+            Collections.<TupleTag<?>>emptyList(),
+            new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String,
Integer>>>(),
+            windowingStrategy,
+            new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping
*/
+            Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            StringUtf8Coder.of() /* key coder */);
+
+    KeyedOneInputStreamOperatorTestHarness<
+        String,
+        WindowedValue<KV<String, Integer>>,
+        WindowedValue<KV<String, Integer>>> testHarness =
+        new KeyedOneInputStreamOperatorTestHarness<>(
+            doFnOperator,
+            new KeySelector<WindowedValue<KV<String, Integer>>, String>()
{
+              @Override
+              public String getKey(
+                  WindowedValue<KV<String, Integer>> kvWindowedValue) throws
Exception {
+                return kvWindowedValue.getValue().getKey();
+              }
+            },
+            new CoderTypeInformation<>(StringUtf8Coder.of()));
+
+    testHarness.open();
+
+    testHarness.processWatermark(0);
+
+    assertEquals(0, testHarness.numKeyedStateEntries());
+
+    IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
+
+    testHarness.processElement(
+        new StreamRecord<>(
+            WindowedValue.of(KV.of("key1", 5), new Instant(1), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.processElement(
+        new StreamRecord<>(
+            WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.of(
+                KV.of("key1", 5 + offset), new Instant(1), window1, PaneInfo.NO_FIRING),
+            WindowedValue.of(
+                KV.of("key2", 7 + offset), new Instant(3), window1, PaneInfo.NO_FIRING)));
+
+    assertEquals(2, testHarness.numKeyedStateEntries());
+
+    testHarness.getOutput().clear();
+
+    // this should trigger both the window.maxTimestamp() timer and the GC timer
+    // this tests that the GC timer fires after the user timer
+    testHarness.processWatermark(15);
+
+    assertThat(
+        this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.of(
+                KV.of("key1", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING),
+            WindowedValue.of(
+                KV.of("key2", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING)));
+
+    // ensure the state was garbage collected
+    assertEquals(0, testHarness.numKeyedStateEntries());
+
+    testHarness.close();
+  }
+
   public void testSideInputs(boolean keyed) throws Exception {
 
     WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =


Mime
View raw message