beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Add GC hold if have data. Don't set timers beyond GlobalWindow.maxTimestamp
Date Thu, 26 May 2016 16:07:08 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master d4c052c32 -> cca2577c6


Add GC hold if have data. Don't set timers beyond GlobalWindow.maxTimestamp


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ab5b888
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ab5b888
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ab5b888

Branch: refs/heads/master
Commit: 9ab5b888998ace0fb9b8396dc0355edd44bc65f8
Parents: d4c052c
Author: Mark Shields <markshields@google.com>
Authored: Tue May 24 13:52:41 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 26 09:06:12 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/ReduceFnRunner.java    | 106 +++++++---
 .../org/apache/beam/sdk/util/WatermarkHold.java | 212 ++++++++++++-------
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 113 ++++++++++
 3 files changed, 325 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ab5b888/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index e916aa8..889ac6f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -580,14 +581,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
           "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer,
window);
     }
 
-    // If this is an end-of-window timer then, we need to set a GC timer
+    // If this is an end-of-window timer then we may need to set a garbage collection timer
+    // if allowed lateness is non-zero.
     boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-      && timer.getTimestamp().equals(window.maxTimestamp());
+        && timer.getTimestamp().equals(window.maxTimestamp());
 
     // If this is a garbage collection timer then we should trigger and garbage collect the
window.
-    Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-    boolean isGarbageCollection =
-        TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(cleanupTime);
+    // We'll consider any timer at or after the end-of-window time to be a signal to garbage
+    // collect.
+    Instant cleanupTime = garbageCollectionTime(window);
+    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
+        && !timer.getTimestamp().isBefore(cleanupTime);
 
     if (isGarbageCollection) {
       WindowTracing.debug(
@@ -600,7 +604,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
         // We need to call onTrigger to emit the final pane if required.
         // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
         // and the watermark has passed the end of the window.
-        onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
+        @Nullable Instant newHold =
+            onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
+        Preconditions.checkState(newHold == null);
       }
 
       // Cleanup flavor B: Clear all the remaining state for this window since we'll never
@@ -626,7 +632,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
         // timer. (If getAllowedLateness is zero then the timer event will be considered
a
         // cleanup event and handled by the above).
         // Note we must do this even if the trigger is finished so that we are sure to cleanup
-        // any final trigger tombstones.
+        // any final trigger finished bits.
         Preconditions.checkState(
             windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
             "Unexpected zero getAllowedLateness");
@@ -635,6 +641,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
             + "inputWatermark:{}; outputWatermark:{}",
             key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
+        Preconditions.checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                                 "Cleanup time %s is beyond end-of-time", cleanupTime);
         directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
       }
     }
@@ -646,7 +654,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
    * beyond allowed lateness.
    * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
    * <ol>
-   * <li>We can clear the trigger state tombstone since we'll never need to ask about
it again.
+   * <li>We can clear the trigger finished bits since we'll never need to ask if the
trigger is
+   * closed again.
    * <li>We can clear any remaining garbage collection hold.
    * </ol>
    */
@@ -661,12 +670,30 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       reduceFn.clearState(renamedContext);
       watermarkHold.clearHolds(renamedContext);
       nonEmptyPanes.clearPane(renamedContext.state());
+      // These calls work irrespective of whether the window is active or not, but
+      // are unnecessary if the window is not active.
       triggerRunner.clearState(
           directContext.window(), directContext.timers(), directContext.state());
+      paneInfoTracker.clear(directContext.state());
     } else {
-      // Needed only for backwards compatibility over UPDATE.
-      // Clear any end-of-window or garbage collection holds keyed by the current window.
-      // Only needed if:
+      // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed
(2).
+      // For (1), if !activeWindows.isActive then the window must be merging and has been
+      // explicitly removed by emitIfAppropriate. But in that case the trigger must have
fired
+      // and been closed, so this case reduces to (2).
+      // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
+      // closed state. In that case emitIfAppropriate will have cleared all state in
+      // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
+      // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
+      // Since the trigger fired the existing watermark holds must have been cleared, and
since
+      // the trigger closed no new end of window or garbage collection hold will have been
+      // placed by WatermarkHold.extractAndRelease.
+      // Thus all the state clearing above is unnecessary.
+      //
+      // But(!) for backwards compatibility we must allow a pipeline to be updated from
+      // an sdk version <= 1.3. In that case it is possible we have an end-of-window or
+      // garbage collection holds keyed by the current window (reached via directContext)
rather
+      // than the state address window (reached via renamedContext).
+      // However this can only happen if:
       // - We have merging windows.
       // - We are DISCARDING_FIRED_PANES.
       // - A pane has fired.
@@ -676,7 +703,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
         watermarkHold.clearHolds(directContext);
       }
     }
-    paneInfoTracker.clear(directContext.state());
+
     // Don't need to track address state windows anymore.
     activeWindows.remove(directContext.window());
     // We'll never need to test for the trigger being closed again.
@@ -761,8 +788,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
 
   /**
    * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
+   *
+   * @return output watermark hold added, or {@literal null} if none.
    */
-  private void onTrigger(
+  @Nullable
+  private Instant onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       boolean isFinished, boolean isEndOfWindow)
@@ -798,8 +828,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       if (newHold.isAfter(directContext.window().maxTimestamp())) {
         // The hold must be for garbage collection, which can't have happened yet.
         Preconditions.checkState(
-          newHold.isEqual(
-            directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
+          newHold.isEqual(garbageCollectionTime(directContext.window())),
           "new hold %s should be at garbage collection for window %s plus %s",
           newHold,
           directContext.window(),
@@ -841,6 +870,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
 
       reduceFn.onTrigger(renamedTriggerContext);
     }
+
+    return newHold;
   }
 
   /**
@@ -864,39 +895,56 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       ReduceFn<?, ?, ?, W>.Context directContext) {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
     Instant endOfWindow = directContext.window().maxTimestamp();
-    Instant fireTime;
     String which;
+    Instant timer;
     if (endOfWindow.isBefore(inputWM)) {
-      fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
+      timer = garbageCollectionTime(directContext.window());
       which = "garbage collection";
     } else {
-      fireTime = endOfWindow;
+      timer = endOfWindow;
       which = "end-of-window";
     }
     WindowTracing.trace(
         "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer
at {} for "
-            + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
         which,
-        fireTime,
+        timer,
         key,
         directContext.window(),
         inputWM,
         timerInternals.currentOutputWatermarkTime());
-    directContext.timers().setTimer(fireTime, TimeDomain.EVENT_TIME);
-    return fireTime;
+    Preconditions.checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                             "Timer %s is beyond end-of-time", timer);
+    directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
+    return timer;
   }
 
-  private void cancelEndOfWindowAndGarbageCollectionTimers(ReduceFn<?, ?, ?, W>.Context
context) {
+  private void cancelEndOfWindowAndGarbageCollectionTimers(
+      ReduceFn<?, ?, ?, W>.Context directContext) {
     WindowTracing.debug(
         "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for
"
         + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
-        key, context.window(), timerInternals.currentInputWatermarkTime(),
+        key, directContext.window(), timerInternals.currentInputWatermarkTime(),
         timerInternals.currentOutputWatermarkTime());
-    Instant timer = context.window().maxTimestamp();
-    context.timers().deleteTimer(timer, TimeDomain.EVENT_TIME);
-    if (windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
-      timer = timer.plus(windowingStrategy.getAllowedLateness());
-      context.timers().deleteTimer(timer, TimeDomain.EVENT_TIME);
+    Instant eow = directContext.window().maxTimestamp();
+    directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+    Instant gc = garbageCollectionTime(directContext.window());
+    if (gc.isAfter(eow)) {
+      directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+    }
+  }
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window is the GlobalWindow,
+   * that will be the end of the window. Otherwise, add the allowed lateness to the end of
+   * the window.
+   */
+  private Instant garbageCollectionTime(W window) {
+    Instant maxTimestamp = window.maxTimestamp();
+    if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) {
+      return maxTimestamp.plus(windowingStrategy.getAllowedLateness());
+    } else {
+      return maxTimestamp;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ab5b888/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
index eb9c257..ad842b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.ReadableState;
@@ -193,7 +194,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
   public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
     Instant hold = addElementHold(context);
     if (hold == null) {
-      hold = addEndOfWindowOrGarbageCollectionHolds(context);
+      hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
     }
     return hold;
   }
@@ -204,23 +205,31 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
    */
   private Instant shift(Instant timestamp, W window) {
     Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
-    if (shifted.isBefore(timestamp)) {
-      throw new IllegalStateException(
-          String.format("OutputTimeFn moved element from %s to earlier time %s for window
%s",
-              timestamp, shifted, window));
-    }
-    if (!timestamp.isAfter(window.maxTimestamp()) && shifted.isAfter(window.maxTimestamp()))
{
-      throw new IllegalStateException(
-          String.format("OutputTimeFn moved element from %s to %s which is beyond end of
window %s",
-              timestamp, shifted, window));
-    }
+    Preconditions.checkState(!shifted.isBefore(timestamp),
+                             "OutputTimeFn moved element from %s to earlier time %s for window
%s",
+                             timestamp, shifted, window);
+    Preconditions.checkState(timestamp.isAfter(window.maxTimestamp())
+                             || !shifted.isAfter(window.maxTimestamp()),
+                             "OutputTimeFn moved element from %s to %s which is beyond end
of "
+                             + "window %s",
+                             timestamp, shifted, window);
 
     return shifted;
   }
 
   /**
-   * Add an element hold if possible. Return instant at which hold was added, or {@literal
null}
-   * if no hold was added.
+   * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
+   * added (ie the element timestamp plus any forward shift requested by the
+   * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
+   * The hold is only added if both:
+   * <ol>
+   * <li>The backend will be able to respect it. In other words the output watermark
cannot
+   * be ahead of the proposed hold time.
+   * <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end
of the
+   * window. In other words the input watermark cannot be ahead of the end of the window.
+   * </ol>
+   * The hold ensures the pane which incorporates the element is will not be considered late
by
+   * any downstream computation when it is eventually emitted.
    */
   @Nullable
   private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context)
{
@@ -232,11 +241,6 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
 
-    // Only add the hold if we can be sure:
-    // - the backend will be able to respect it
-    // (ie the hold is at or ahead of the output watermark), AND
-    // - a timer will be set to clear it by the end of window
-    // (ie the end of window is at or ahead of the input watermark).
     String which;
     boolean tooLate;
     // TODO: These case labels could be tightened.
@@ -250,6 +254,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     } else {
       which = "on time";
       tooLate = false;
+      Preconditions.checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                               "Element hold %s is beyond end-of-time", elementHold);
       context.state().access(elementHoldTag).add(elementHold);
     }
     WindowTracing.trace(
@@ -264,87 +270,136 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
   /**
    * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
    * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was
added.
-   *
-   * <p>The end-of-window hold guarantees that an empty {@code ON_TIME} pane can be
given
-   * a timestamp which will not be considered beyond allowed lateness by any downstream computation.
    */
   @Nullable
-  private Instant addEndOfWindowOrGarbageCollectionHolds(ReduceFn<?, ?, ?, W>.Context
context) {
-    Instant hold = addEndOfWindowHold(context);
+  private Instant addEndOfWindowOrGarbageCollectionHolds(
+      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+    Instant hold = addEndOfWindowHold(context, paneIsEmpty);
     if (hold == null) {
-      hold = addGarbageCollectionHold(context);
+      hold = addGarbageCollectionHold(context, paneIsEmpty);
     }
     return hold;
   }
 
   /**
-   * Add an end-of-window hold. Return the {@link Instant} at which hold was added,
-   * or {@literal null} if no hold was added.
+   * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold
was added
+   * (ie the end of window time), or {@literal null} if no end of window hold is possible
and we
+   * should fallback to a garbage collection hold.
    *
-   * <p>The end-of-window hold guarantees that any empty {@code ON_TIME} pane can be
given
-   * a timestamp which will not be considered beyond allowed lateness by any downstream computation.
+   * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
+   * to clear it. In other words, the input watermark cannot be ahead of the end of window
time.
+   *
+   * <p>An end-of-window hold is added in two situations:
+   * <ol>
+   * <li>An incoming element came in behind the output watermark (so we are too late
for placing
+   * the usual element hold), but it may still be possible to include the element in an
+   * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
+   * not be considered late by any downstream computation.
+   * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows
which saw at
+   * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements
in
+   * a pane are processed due to a fired trigger we must set both an end of window timer
and an end
+   * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be
considered
+   * late by any downstream computation.
+   * </ol>
    */
   @Nullable
-  private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context) {
-    // Only add an end-of-window hold if we can be sure a timer will be set to clear it
-    // by the end of window (ie the end of window is at or ahead of the input watermark).
+  private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean
paneIsEmpty) {
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    String which;
-    boolean tooLate;
     Instant eowHold = context.window().maxTimestamp();
+
     if (eowHold.isBefore(inputWM)) {
-      which = "too late for end-of-window timer";
-      tooLate = true;
-    } else {
-      which = "on time";
-      tooLate = false;
-      Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM),
-          "End-of-window hold %s cannot be before output watermark %s", eowHold, outputWM);
-      context.state().access(EXTRA_HOLD_TAG).add(eowHold);
+      WindowTracing.trace(
+          "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
+          + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+          eowHold, context.key(), context.window(), inputWM, outputWM);
+      return null;
     }
+
+    Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM),
+                             "End-of-window hold %s cannot be before output watermark %s",
+                             eowHold, outputWM);
+    Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                             "End-of-window hold %s is beyond end-of-time", eowHold);
+    // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
+    // the hold away from the combining function in elementHoldTag.
+    // However if !paneIsEmpty then it could make sense  to use the elementHoldTag here.
+    // Alas, onMerge is forced to add an end of window or garbage collection hold without
+    // knowing whether an element hold is already in place (stopping to check is too expensive).
+    // This it would end up adding an element hold at the end of the window which could
+    // upset the elementHoldTag combining function.
+    context.state().access(EXTRA_HOLD_TAG).add(eowHold);
     WindowTracing.trace(
-        "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is {} for "
+        "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
         + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        eowHold, which, context.key(), context.window(), inputWM,
-        outputWM);
-
-    return tooLate ? null : eowHold;
+        eowHold, context.key(), context.window(), inputWM, outputWM);
+    return eowHold;
   }
 
   /**
-   * Add a garbage collection hold, if required. Return the {@link Instant} at which hold
was added,
+   * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant}
at
+   * which the hold was added (ie the end of window time plus allowed lateness),
    * or {@literal null} if no hold was added.
    *
-   * <p>The garbage collection hold gurantees that any empty final pane can be given
-   * a timestamp which will not be considered beyond allowed lateness by any downstream
-   * computation. If we are sure no empty final panes can be emitted then there's no need
-   * for an additional hold.
+   * <p>We only add the hold if it is distinct from what would be added by
+   * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
+   * must be non-zero.
+   *
+   * <p>A garbage collection hold is added in two situations:
+   * <ol>
+   * <li>An incoming element came in behind the output watermark, and was too late
for placing
+   * the usual element hold or an end of window hold. Place the garbage collection hold so
that
+   * we can guarantee when the pane is finally triggered its output will not be dropped due
to
+   * excessive lateness by any downstream computation.
+   * <li>The {@link WindowingStrategy#getClosingBehavior()} is
+   * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
+   * for all windows which saw at least one element. Again, the garbage collection hold guarantees
+   * that any empty final pane can be given a timestamp which will not be considered beyond
+   * allowed lateness by any downstream computation.
+   * </ol>
+   *
+   * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
    */
   @Nullable
-  private Instant addGarbageCollectionHold(ReduceFn<?, ?, ?, W>.Context context) {
-    // Only add a garbage collection hold if we may need to emit an empty pane
-    // at garbage collection time, and garbage collection time is strictly after the
-    // end of window. (All non-empty panes will have holds at their output
-    // time derived from their incoming elements and no additional hold is required.)
-    if (context.windowingStrategy().getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS
-        && windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
-      Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      Instant outputWM = timerInternals.currentOutputWatermarkTime();
-      Instant inputWM = timerInternals.currentInputWatermarkTime();
+  private Instant addGarbageCollectionHold(
+      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+    Instant outputWM = timerInternals.currentOutputWatermarkTime();
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Instant eow = context.window().maxTimestamp();
+    Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
 
+    if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
       WindowTracing.trace(
-          "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
-          + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary
"
+          + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
+          + "outputWatermark:{}",
+          gcHold, context.key(), context.window(), inputWM, outputWM);
+      return null;
+    }
+
+    if (paneIsEmpty && context.windowingStrategy().getClosingBehavior()
+                       == ClosingBehavior.FIRE_IF_NON_EMPTY) {
+      WindowTracing.trace(
+          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary
"
+          + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{};
"
+          + "outputWatermark:{}",
           gcHold, context.key(), context.window(), inputWM, outputWM);
-      Preconditions.checkState(!gcHold.isBefore(inputWM),
-          "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM);
-      context.state().access(EXTRA_HOLD_TAG).add(gcHold);
-      return gcHold;
-    } else {
       return null;
     }
+
+    Preconditions.checkState(!gcHold.isBefore(inputWM),
+                             "Garbage collection hold %s cannot be before input watermark
%s",
+                             gcHold, inputWM);
+    Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                             "Garbage collection hold %s is beyond end-of-time", gcHold);
+    // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above.
+    context.state().access(EXTRA_HOLD_TAG).add(gcHold);
+
+    WindowTracing.trace(
+        "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time
for "
+        + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+        gcHold, context.key(), context.window(), inputWM, outputWM);
+    return gcHold;
   }
 
   /**
@@ -360,17 +415,19 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
    * watermark hold, then earlier holds may be released.
    */
   public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
-    WindowTracing.debug("onMerge: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
-        timerInternals.currentOutputWatermarkTime());
+    WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{};
"
+                        + "outputWatermark:{}",
+                        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+                        timerInternals.currentOutputWatermarkTime());
     StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window());
     // If we had a cheap way to determine if we have an element hold then we could
     // avoid adding an unnecessary end-of-window or garbage collection hold.
     // Simply reading the above merged watermark would impose an additional read for the
-    // common case that the active window has just one undelying state address window and
-    // the hold depends on the min of the elemest timestamps.
+    // common case that the active window has just one underlying state address window and
+    // the hold depends on the min of the element timestamps.
+    // At least one merged window must be non-empty for the merge to have been triggered.
     StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
-    addEndOfWindowOrGarbageCollectionHolds(context);
+    addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
   }
 
   /**
@@ -398,7 +455,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
   public ReadableState<OldAndNewHolds> extractAndRelease(
       final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
     WindowTracing.debug(
-        "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+        "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; "
+        + "outputWatermark:{}",
         context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
         timerInternals.currentOutputWatermarkTime());
     final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
@@ -447,7 +505,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
         if (!isFinished) {
           // Only need to leave behind an end-of-window or garbage collection hold
           // if future elements will be processed.
-          newHold = addEndOfWindowOrGarbageCollectionHolds(context);
+          newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
         }
 
         return new OldAndNewHolds(oldHold, newHold);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ab5b888/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 41c1710..0df4bc6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -48,6 +48,8 @@ import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -62,6 +64,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -1201,6 +1204,116 @@ public class ReduceFnRunnerTest {
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 3,
2)));
   }
 
+  /**
+   * We should fire a non-empty ON_TIME pane in the GlobalWindow when the watermark moves
to
+   * end-of-time.
+   */
+  @Test
+  public void fireNonEmptyOnDrainInGlobalWindow() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(new GlobalWindows())
+                             .withTrigger(Repeatedly.<GlobalWindow>forever(
+                                 AfterPane.elementCountAtLeast(3)))
+                             .withMode(AccumulationMode.DISCARDING_FIRED_PANES));
+
+    tester.advanceInputWatermark(new Instant(0));
+
+    final int n = 20;
+    for (int i = 0; i < n; i++) {
+      tester.injectElements(TimestampedValue.of(i, new Instant(i)));
+    }
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertEquals(n / 3, output.size());
+    for (int i = 0; i < output.size(); i++) {
+      assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
+      assertEquals(i, output.get(i).getPane().getIndex());
+      assertEquals(3, Iterables.size(output.get(i).getValue()));
+    }
+
+    tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    output = tester.extractOutput();
+    assertEquals(1, output.size());
+    assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
+    assertEquals(n / 3, output.get(0).getPane().getIndex());
+    assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue()));
+  }
+
+  /**
+   * We should fire an empty ON_TIME pane in the GlobalWindow when the watermark moves to
+   * end-of-time.
+   */
+  @Test
+  public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(new GlobalWindows())
+                             .withTrigger(Repeatedly.<GlobalWindow>forever(
+                                 AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                                     new Duration(3))))
+                             .withMode(AccumulationMode.DISCARDING_FIRED_PANES));
+
+    final int n = 20;
+    for (int i = 0; i < n; i++) {
+      tester.advanceProcessingTime(new Instant(i));
+      tester.injectElements(TimestampedValue.of(i, new Instant(i)));
+    }
+    tester.advanceProcessingTime(new Instant(n + 4));
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertEquals((n + 3) / 4, output.size());
+    for (int i = 0; i < output.size(); i++) {
+      assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
+      assertEquals(i, output.get(i).getPane().getIndex());
+      assertEquals(4, Iterables.size(output.get(i).getValue()));
+    }
+
+    tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    output = tester.extractOutput();
+    assertEquals(1, output.size());
+    assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
+    assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
+    assertEquals(0, Iterables.size(output.get(0).getValue()));
+  }
+
+  /**
+   * Late elements should still have a garbage collection hold set so that they
+   * can make a late pane rather than be dropped due to lateness.
+   */
+  @Test
+  public void setGarbageCollectionHoldOnLateElements() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2)),
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceOutputWatermark(new Instant(0));
+    tester.injectElements(TimestampedValue.of(1,  new Instant(1)));
+
+    // Fire ON_TIME pane @ 9 with 1
+
+    tester.advanceInputWatermark(new Instant(109));
+    tester.advanceOutputWatermark(new Instant(109));
+    tester.injectElements(TimestampedValue.of(2,  new Instant(2)));
+    // We should have set a garbage collection hold for the final pane.
+    Instant hold = tester.getWatermarkHold();
+    assertEquals(new Instant(109), hold);
+
+    tester.advanceInputWatermark(new Instant(110));
+    tester.advanceOutputWatermark(new Instant(110));
+
+    // Fire final LATE pane @ 9 with 2
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+  }
+
   private static class SumAndVerifyContextFn extends CombineFnWithContext<Integer, int[],
Integer> {
 
     private final PCollectionView<Integer> view;



Mime
View raw message