spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmbrus <>
Subject [GitHub] spark pull request #15702: [SPARK-18124] Observed-delay based Even Time Wate...
Date Mon, 31 Oct 2016 22:22:08 GMT
GitHub user marmbrus opened a pull request:

    [SPARK-18124] Observed-delay based Even Time Watermarks

    This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify
an _event time watermark_.  An event time watermark allows the streaming engine to reason
about the point in time after which we no longer expect to see late data.  This PR also has
augmented `StreamExecution` to use this watermark for several purposes:
      - To know when a given time window aggregation is finalized and thus results can be
emitted when using output modes that do not allow updates (e.g. `Append` mode).
      - To minimize the amount of state that we need to keep for on-going aggregations, by
evicting state for groups that are no longer expected to change.  Note that we do still maintain
all state if required (i.e. when in `Complete` mode).
    An example that emits windowed counts of records, waiting up to 5 minutes for late data
to arrive.
    df.withWatermark($"eventTime", "5 mintues")
      .groupBy(window($"eventTime", "1 minute) as 'window)
      .mode("append") // In append mode, we only output complete aggregations.
    ### Calculating the watermark.
    The current event time is computed by looking at the `MAX(eventTime)` seen this epoch
across all of the partitions in the query minus some user defined _delayThreshold_.  Note
that since we must coordinate this value across partitions occasionally, the actual watermark
used is only guaranteed to be at least `delay` behind the actual event time.  In some cases
we may still process records that arrive more than delay late.
    This mechanism was chosen for the initial implementation over processing time for two
      - it is robust to downtime that could affect processing delay
      - it does not require syncing of time or timezones across
    ### Other notable implementation details
     - A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
     - We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`.
 This allows downstream operations to know which column holds the event time.  Operations
like `window` propagate this metadata.
     - `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of
how this information is propagated.
     - Currently, we don't filter out late records, but instead rely on the state store to
avoid emitting records that are both added and filtered in the same epoch.
    ### Remaining in this PR
     - [ ] The test for recovery is currently failing as we don't record the watermark used
in the offset log.  We will need to do so to ensure determinism, but this is deferred until
#15626 is merged.
    ### Other follow-ups
    There are some natural additional features that we should consider for future work:
     - Ability to write records that arrive too late to some external store in case any out-of-band
remediation is required.
     - `Update` mode so you can get partial results before a group is evicted.
     - Other mechanisms for calculating the watermark.  In particular a watermark based on
quantiles would be more robust to outliers.

You can merge this pull request into a Git repository by running:

    $ git pull watermarks

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15702
commit e6e3bbe9ca2d2081264b5bff68293572af7778a7
Author: Michael Armbrust <>
Date:   2016-10-28T04:11:19Z

    first test passing

commit 92320720492f192fe6791d0fea90495ea5db94a7
Author: Michael Armbrust <>
Date:   2016-10-28T07:55:57Z


commit 5b921323092c5730f816795193a6e0d985d7e430
Author: Michael Armbrust <>
Date:   2016-10-31T22:00:32Z

    Merge remote-tracking branch 'origin/master' into watermarks


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message