beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: [BEAM-80] Decide EARLY or ON_TIME based on input watermark
Date Thu, 03 Mar 2016 03:05:42 GMT
[BEAM-80] Decide EARLY or ON_TIME based on input watermark

This change is based on trigger specs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/968494f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/968494f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/968494f3

Branch: refs/heads/master
Commit: 968494f3e427cc242d91ef6de25f5c7c408540dc
Parents: f87f35b
Author: Pei He <peihe0@gmail.com>
Authored: Wed Mar 2 16:28:43 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Mar 2 19:01:09 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/util/PaneInfoTracker.java      | 30 +++++++++----------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 31 ++++++++------------
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   |  9 +++---
 3 files changed, 32 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
index 38499c2..a7818a3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
@@ -54,13 +54,11 @@ public class PaneInfoTracker {
    * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}.
The pane
    * info includes the timing for the pane, who's calculation is quite subtle.
    *
-   * @param isEndOfWindow should be {@code true} only if the pane is being emitted
-   * because an end-of-window timer has fired and the trigger agreed we should fire.
    * @param isFinal should be {@code true} only if the triggering machinery can guarantee
    * no further firings for the
    */
-  public ReadableState<PaneInfo> getNextPaneInfo(ReduceFn<?, ?, ?, ?>.Context
context,
-      final boolean isEndOfWindow, final boolean isFinal) {
+  public ReadableState<PaneInfo> getNextPaneInfo(
+      ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
     final Object key = context.key();
     final ReadableState<PaneInfo> previousPaneFuture =
         context.state().access(PaneInfoTracker.PANE_INFO_TAG);
@@ -76,7 +74,7 @@ public class PaneInfoTracker {
       @Override
       public PaneInfo read() {
         PaneInfo previousPane = previousPaneFuture.read();
-        return describePane(key, windowMaxTimestamp, previousPane, isEndOfWindow, isFinal);
+        return describePane(key, windowMaxTimestamp, previousPane, isFinal);
       }
     };
   }
@@ -85,8 +83,8 @@ public class PaneInfoTracker {
     context.state().access(PANE_INFO_TAG).write(currentPane);
   }
 
-  private <W> PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneInfo
previousPane,
-      boolean isEndOfWindow, boolean isFinal) {
+  private <W> PaneInfo describePane(
+      Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
     boolean isFirst = previousPane == null;
     Timing previousTiming = isFirst ? null : previousPane.getTiming();
     long index = isFirst ? 0 : previousPane.getIndex() + 1;
@@ -104,26 +102,28 @@ public class PaneInfoTracker {
     // if the output watermark is behind the end of the window.
     boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
 
+    // True is the input watermark hasn't passed the window's max timestamp.
+    boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
+
     Timing timing;
     if (isLateForOutput || !onlyEarlyPanesSoFar) {
       // The output watermark has already passed the end of this window, or we have already
       // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
       // consider this pane LATE.
       timing = Timing.LATE;
-    } else if (isEndOfWindow) {
-      // This is the unique ON_TIME firing for the window.
-      timing = Timing.ON_TIME;
-    } else {
-      // All other cases are EARLY.
+    } else if (isEarlyForInput) {
+      // This is an EARLY firing.
       timing = Timing.EARLY;
       nonSpeculativeIndex = -1;
+    } else {
+      // This is the unique ON_TIME firing for the window.
+      timing = Timing.ON_TIME;
     }
 
     WindowTracing.debug(
         "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
-        + "inputWatermark:{}; outputWatermark:{}; isEndOfWindow:{}; isLateForOutput:{}",
-        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isEndOfWindow,
-        isLateForOutput);
+        + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
 
     if (previousPane != null) {
       // Timing transitions should follow EARLY* ON_TIME? LATE*

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index fe5c474..1a009bb 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -289,7 +289,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
           contextFactory.base(mergedWindow, StateStyle.RENAMED);
       triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
-      emitIfAppropriate(directContext, renamedContext, false/* isEndOfWindow */);
+      emitIfAppropriate(directContext, renamedContext);
     }
 
     // We're all done with merging and emitting elements so can compress the activeWindow
state.
@@ -532,14 +532,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
           "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer,
window);
     }
 
-    // If this is an end-of-window timer then:
-    // 1. We need to set a GC timer
-    // 2. We need to let the PaneInfoTracker know that we are transitioning from early to
late,
-    // and possibly emitting an on-time pane.
-    boolean isEndOfWindow =
-        TimeDomain.EVENT_TIME == timer.getDomain()
-        && timer.getTimestamp().equals(window.maxTimestamp());
-
     // If this is a garbage collection timer then we should trigger and garbage collect the
window.
     Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     boolean isGarbageCollection =
@@ -556,7 +548,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
         // We need to call onTrigger to emit the final pane if required.
         // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
         // and the watermark has passed the end of the window.
-        onTrigger(directContext, renamedContext, isEndOfWindow, true/* isFinished */);
+        onTrigger(directContext, renamedContext, true/* isFinished */);
       }
 
       // Cleanup flavor B: Clear all the remaining state for this window since we'll never
@@ -569,9 +561,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
           key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
           timerInternals.currentOutputWatermarkTime());
       if (windowIsActive) {
-        emitIfAppropriate(directContext, renamedContext, isEndOfWindow);
+        emitIfAppropriate(directContext, renamedContext);
       }
 
+      // If this is an end-of-window timer then, we need to set a GC timer
+      boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+          && timer.getTimestamp().equals(window.maxTimestamp());
       if (isEndOfWindow) {
         // Since we are processing an on-time firing we should schedule the garbage collection
         // timer. (If getAllowedLateness is zero then the timer event will be considered
a
@@ -649,7 +644,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
    * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup
state.
    */
   private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext, boolean isEndOfWindow)
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
       throws Exception {
     if (!triggerRunner.shouldFire(
         directContext.window(), directContext.timers(), directContext.state())) {
@@ -667,7 +662,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     // Run onTrigger to produce the actual pane contents.
     // As a side effect it will clear all element holds, but not necessarily any
     // end-of-window or garbage collection holds.
-    onTrigger(directContext, renamedContext, isEndOfWindow, isFinished);
+    onTrigger(directContext, renamedContext, isFinished);
 
     // Now that we've triggered, the pane is empty.
     nonEmptyPanes.clearPane(renamedContext.state());
@@ -692,13 +687,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
   /**
    * Do we need to emit a pane?
    */
-  private boolean needToEmit(
-      boolean isEmpty, boolean isEndOfWindow, boolean isFinished, PaneInfo.Timing timing)
{
+  private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing)
{
     if (!isEmpty) {
       // The pane has elements.
       return true;
     }
-    if (isEndOfWindow && timing == Timing.ON_TIME) {
+    if (timing == Timing.ON_TIME) {
       // This is the unique ON_TIME pane.
       return true;
     }
@@ -715,14 +709,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
   private void onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean isEndOfWindow,
       boolean isFinished)
           throws Exception {
     // Prefetch necessary states
     ReadableState<Instant> outputTimestampFuture =
         watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
     ReadableState<PaneInfo> paneFuture =
-        paneInfoTracker.getNextPaneInfo(directContext, isEndOfWindow, isFinished).readLater();
+        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
     ReadableState<Boolean> isEmptyFuture =
         nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
 
@@ -735,7 +728,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     final Instant outputTimestamp = outputTimestampFuture.read();
 
     // Only emit a pane if it has data or empty panes are observable.
-    if (needToEmit(isEmptyFuture.read(), isEndOfWindow, isFinished, pane.getTiming())) {
+    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.
       final List<W> windows = Collections.singletonList(directContext.window());
       ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index c85b1ca..4fb3e37 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -479,19 +479,20 @@ public class ReduceFnRunnerTest {
     when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 3);
     assertThat(tester.extractOutput(), contains(
-        // This is late, because the trigger wasn't waiting for AfterWatermark
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY,
2, -1))));
+        WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
 
     when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 4);
     assertThat(tester.extractOutput(), contains(
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY,
3, -1))));
+        WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
 
     when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
     triggerShouldFinish(mockTrigger);
     injectElement(tester, 5);
     assertThat(tester.extractOutput(), contains(
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.EARLY, 4,
-1))));
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4,
2))));
   }
 
   @Test


Mime
View raw message