beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stasle...@apache.org
Subject [1/2] beam git commit: [BEAM-2859] Fixed processing timers not being properly fired when watermark stays put by tweaking the way spark-runner was delivering timers to reduceFnRunner in SparkGroupAlsoByWindowViaWindowSet
Date Wed, 13 Sep 2017 08:04:42 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8d71ebf82 -> 50532f0a9


[BEAM-2859] Fixed processing timers not being properly fired when watermark stays put by tweaking
the way spark-runner was delivering timers to reduceFnRunner in SparkGroupAlsoByWindowViaWindowSet


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3d4c5d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3d4c5d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3d4c5d9

Branch: refs/heads/master
Commit: c3d4c5d98cc115dce7e03e64cd29713562ff62b3
Parents: 8d71ebf
Author: Stas Levin <staslevin@apache.org>
Authored: Tue Sep 12 10:34:45 2017 +0300
Committer: Stas Levin <staslevin@apache.org>
Committed: Wed Sep 13 11:04:08 2017 +0300

----------------------------------------------------------------------
 .../SparkGroupAlsoByWindowViaWindowSet.java     | 82 +++++++++++++-------
 .../spark/stateful/SparkTimerInternals.java     | 15 ----
 2 files changed, 56 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 2258f05..1fb8700 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -18,7 +18,9 @@
 package org.apache.beam.runners.spark.stateful;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
 import java.io.Serializable;
@@ -51,6 +53,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -204,6 +207,32 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable
{
         this.droppedDueToLateness = droppedDueToLateness;
       }
 
+      /**
+       * Retrieves the timers that are eligible for processing by {@link
+       * org.apache.beam.runners.core.ReduceFnRunner}.
+       *
+       * @return A collection of timers that are eligible for processing. For a {@link
+       *     TimeDomain#EVENT_TIME} timer, this implies that the watermark has passed the
timer's
+       *     timestamp. For other <code>TimeDomain</code>s (e.g., {@link
+       *     TimeDomain#PROCESSING_TIME}), a timer is always considered eligible for processing
(no
+       *     restrictions).
+       */
+      private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(
+          final Collection<TimerInternals.TimerData> timers, final Instant inputWatermark)
{
+        final Predicate<TimerInternals.TimerData> eligibleForProcessing =
+            new Predicate<TimerInternals.TimerData>() {
+
+              @Override
+              public boolean apply(final TimerInternals.TimerData timer) {
+                return !timer.getDomain().equals(TimeDomain.EVENT_TIME)
+                    || inputWatermark.isAfter(timer.getTimestamp());
+              }
+            };
+
+        return FluentIterable.from(timers).filter(eligibleForProcessing).toSet();
+      }
+
+
       @Override
       protected Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/
List<byte[]>>>
           computeNext() {
@@ -268,16 +297,14 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable
{
 
               LOG.trace(logPrefix + ": input elements: {}", elements);
 
-              /*
-              Incoming expired windows are filtered based on
-              timerInternals.currentInputWatermarkTime() and the configured allowed
-              lateness. Note that this is done prior to calling
-              timerInternals.advanceWatermark so essentially the inputWatermark is
-              the highWatermark of the previous batch and the lowWatermark of the
-              current batch.
-              The highWatermark of the current batch will only affect filtering
-              as of the next batch.
-               */
+              // Incoming expired windows are filtered based on
+              // timerInternals.currentInputWatermarkTime() and the configured allowed
+              // lateness. Note that this is done prior to calling
+              // timerInternals.advanceWatermark so essentially the inputWatermark is
+              // the highWatermark of the previous batch and the lowWatermark of the
+              // current batch.
+              // The highWatermark of the current batch will only affect filtering
+              // as of the next batch.
               final Iterable<WindowedValue<InputT>> nonExpiredElements =
                   Lists.newArrayList(
                       LateDataUtils.dropExpiredWindows(
@@ -302,23 +329,26 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable
{
             // store the highWatermark as the new inputWatermark to calculate triggers
             timerInternals.advanceWatermark();
 
-            LOG.debug(
-                logPrefix + ": timerInternals after advance are {}",
-                timerInternals.toString());
-
-            // call on timers that are ready.
-            final Collection<TimerInternals.TimerData> readyToProcess =
-                timerInternals.getTimersReadyToProcess();
-
-            LOG.debug(logPrefix + ": ready timers are {}", readyToProcess);
+            final Collection<TimerInternals.TimerData> timersEligibleForProcessing
=
+                filterTimersEligibleForProcessing(
+                    timerInternals.getTimers(), timerInternals.currentInputWatermarkTime());
 
-            /*
-            Note that at this point, the watermark has already advanced since
-            timerInternals.advanceWatermark() has been called and the highWatermark
-            is now stored as the new inputWatermark, according to which triggers are
-            calculated.
-             */
-            reduceFnRunner.onTimers(readyToProcess);
+            LOG.debug(
+                logPrefix + ": timers eligible for processing are {}", timersEligibleForProcessing);
+
+            // Note that at this point, the watermark has already advanced since
+            // timerInternals.advanceWatermark() has been called and the highWatermark
+            // is now stored as the new inputWatermark, according to which triggers are
+            // calculated.
+            // Note 2: The implicit contract between the runner and reduceFnRunner is that
+            // event_time based triggers are only delivered if the watermark has passed their
+            // timestamp.
+            // Note 3: Timer cleanups are performed by the GC timer scheduled by reduceFnRunner
as
+            // part of processing timers.
+            // Note 4: Even if a given timer is deemed eligible for processing, it does not
+            // necessarily mean that it will actually fire (firing is determined by the trigger
+            // itself, not the TimerInternals/TimerData objects).
+            reduceFnRunner.onTimers(timersEligibleForProcessing);
           } catch (final Exception e) {
             throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e);
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index c998328..4fd8146 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -102,20 +101,6 @@ public class SparkTimerInternals implements TimerInternals {
     return timers;
   }
 
-  /** This should only be called after processing the element. */
-  Collection<TimerData> getTimersReadyToProcess() {
-    Set<TimerData> toFire = Sets.newHashSet();
-    Iterator<TimerData> iterator = timers.iterator();
-    while (iterator.hasNext()) {
-      TimerData timer = iterator.next();
-      if (timer.getTimestamp().isBefore(inputWatermark)) {
-        toFire.add(timer);
-        iterator.remove();
-      }
-    }
-    return toFire;
-  }
-
   void addTimers(Iterable<TimerData> timers) {
     for (TimerData timer: timers) {
       this.timers.add(timer);


Mime
View raw message