beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] incubator-beam git commit: Basic non-null checks
Date Fri, 25 Mar 2016 20:48:42 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 00f608f05 -> 49d82baf1


Basic non-null checks


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c89a1b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c89a1b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c89a1b3

Branch: refs/heads/master
Commit: 1c89a1b3ac0ff296003ae443e6b4763f501b8ada
Parents: 00f608f
Author: Mark Shields <markshields@google.com>
Authored: Wed Mar 2 20:45:59 2016 -0800
Committer: Mark Shields <markshields@google.com>
Committed: Fri Mar 25 12:28:48 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 51 +++++++++++++++++-
 .../cloud/dataflow/sdk/util/TriggerRunner.java  |  2 +
 .../cloud/dataflow/sdk/util/WatermarkHold.java  | 55 +++++++++++++++-----
 3 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2e2d1f6..f1d4582 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -499,6 +499,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
           directContext.timestamp(),
           directContext.timers(),
           directContext.state());
+
+      // At this point, if triggerRunner.shouldFire before the processValue then
+      // triggerRunner.shouldFire after the processValue. In other words adding values
+      // cannot take a trigger state from firing to non-firing.
+      // (We don't actually assert this since it is too slow.)
     }
 
     return windows;
@@ -568,6 +573,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
       if (isEndOfWindow) {
+        // If the window strategy trigger includes a watermark trigger then at this point
+        // there should be no data holds, either because we'd already cleared them on an
+        // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
+        // We could assert this but it is very expensive.
+
         // Since we are processing an on-time firing we should schedule the garbage collection
         // timer. (If getAllowedLateness is zero then the timer event will be considered
a
         // cleanup event and handled by the above).
@@ -715,8 +725,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       boolean isFinished)
           throws Exception {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Preconditions.checkNotNull(inputWM);
+
     // Prefetch necessary states
-    ReadableState<Instant> outputTimestampFuture =
+    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
         watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
     ReadableState<PaneInfo> paneFuture =
         paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
@@ -729,7 +742,41 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     // Calculate the pane info.
     final PaneInfo pane = paneFuture.read();
     // Extract the window hold, and as a side effect clear it.
-    final Instant outputTimestamp = outputTimestampFuture.read();
+
+    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+    final Instant outputTimestamp = pair.oldHold;
+    @Nullable Instant newHold = pair.newHold;
+
+    if (newHold != null && inputWM != null) {
+      // We can't be finished yet.
+      Preconditions.checkState(
+        !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
+      // The hold cannot be behind the input watermark.
+      Preconditions.checkState(
+        !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold,
inputWM);
+      if (newHold.isAfter(directContext.window().maxTimestamp())) {
+        // The hold must be for garbage collection, which can't have happened yet.
+        Preconditions.checkState(
+          newHold.isEqual(
+            directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
+          "new hold %s should be at garbage collection for window %s plus %s",
+          newHold,
+          directContext.window(),
+          windowingStrategy.getAllowedLateness());
+      } else {
+        // The hold must be for the end-of-window, which can't have happened yet.
+        Preconditions.checkState(
+          newHold.isEqual(directContext.window().maxTimestamp()),
+          "new hold %s should be at end of window %s",
+          newHold,
+          directContext.window());
+        Preconditions.checkState(
+          !isEndOfWindow,
+          "new hold at %s for %s but this is the watermark trigger",
+          newHold,
+          directContext.window());
+      }
+    }
 
     // Only emit a pane if it has data or empty panes are observable.
     if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
index dcfd035..8fc4981 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
@@ -172,6 +172,8 @@ public class TriggerRunner<W extends BoundedWindow> {
   }
 
   public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception
{
+    // shouldFire should be false.
+    // However it is too expensive to assert.
     FinishedTriggersBitSet finishedSet =
         readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
     Trigger<W>.TriggerContext context = contextFactory.base(window, timers,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
index d537ddb..31e36c5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
@@ -228,6 +228,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
 
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Preconditions.checkNotNull(inputWM);
 
     // Only add the hold if we can be sure:
     // - the backend will be able to respect it
@@ -287,6 +288,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     // by the end of window (ie the end of window is at or ahead of the input watermark).
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Preconditions.checkNotNull(inputWM);
+
     String which;
     boolean tooLate;
     Instant eowHold = context.window().maxTimestamp();
@@ -329,6 +332,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
       Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
       Instant outputWM = timerInternals.currentOutputWatermarkTime();
       Instant inputWM = timerInternals.currentInputWatermarkTime();
+      Preconditions.checkNotNull(inputWM);
+
       WindowTracing.trace(
           "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
           + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
@@ -369,6 +374,19 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
   }
 
   /**
+   * Result of {@link #extractAndRelease}.
+   */
+  public static class OldAndNewHolds {
+    public final Instant oldHold;
+    @Nullable public final Instant newHold;
+
+    public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
+      this.oldHold = oldHold;
+      this.newHold = newHold;
+    }
+  }
+
+  /**
    * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if required.
    *
@@ -377,7 +395,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
    * elements in the current pane. If there is no such value the timestamp is the end
    * of the window.
    */
-  public ReadableState<Instant> extractAndRelease(
+  public ReadableState<OldAndNewHolds> extractAndRelease(
       final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
     WindowTracing.debug(
         "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
@@ -385,38 +403,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
         timerInternals.currentOutputWatermarkTime());
     final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
     final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
-    return new ReadableState<Instant>() {
+    return new ReadableState<OldAndNewHolds>() {
       @Override
-      public ReadableState<Instant> readLater() {
+      public ReadableState<OldAndNewHolds> readLater() {
         elementHoldState.readLater();
         extraHoldState.readLater();
         return this;
       }
 
       @Override
-      public Instant read() {
+      public OldAndNewHolds read() {
         // Read both the element and extra holds.
         Instant elementHold = elementHoldState.read();
         Instant extraHold = extraHoldState.read();
-        Instant hold;
+        Instant oldHold;
         // Find the minimum, accounting for null.
         if (elementHold == null) {
-          hold = extraHold;
+          oldHold = extraHold;
         } else if (extraHold == null) {
-          hold = elementHold;
+          oldHold = elementHold;
         } else if (elementHold.isBefore(extraHold)) {
-          hold = elementHold;
+          oldHold = elementHold;
         } else {
-          hold = extraHold;
+          oldHold = extraHold;
         }
-        if (hold == null || hold.isAfter(context.window().maxTimestamp())) {
+        if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
           // If no hold (eg because all elements came in behind the output watermark), or
           // the hold was for garbage collection, take the end of window as the result.
           WindowTracing.debug(
               "WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
               + "for key:{}; window:{}",
-              hold, context.key(), context.window());
-          hold = context.window().maxTimestamp();
+              oldHold, context.key(), context.window());
+          oldHold = context.window().maxTimestamp();
         }
         WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
             context.key(), context.window());
@@ -425,13 +443,14 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
         elementHoldState.clear();
         extraHoldState.clear();
 
+        @Nullable Instant newHold = null;
         if (!isFinished) {
           // Only need to leave behind an end-of-window or garbage collection hold
           // if future elements will be processed.
-          addEndOfWindowOrGarbageCollectionHolds(context);
+          newHold = addEndOfWindowOrGarbageCollectionHolds(context);
         }
 
-        return hold;
+        return new OldAndNewHolds(oldHold, newHold);
       }
     };
   }
@@ -447,4 +466,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     context.state().access(elementHoldTag).clear();
     context.state().access(EXTRA_HOLD_TAG).clear();
   }
+
+  /**
+   * Return the current data hold, or null if none. Does not clear. For debugging only.
+   */
+  @Nullable
+  public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
+    return context.state().access(elementHoldTag).read();
+  }
 }


Mime
View raw message