beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Kotze <>
Subject Re: Global Window - Correctness/Completeness
Date Wed, 23 May 2018 18:40:25 GMT
Indeed, and very true :)

More a flight of fancy, than anything else.

Trying hard to find the right tool for the job, but something like this
could potentially (risks aside for this conversation as it is potentially a
hairy implementation with more effort and risk than worth the reward)
reduce the need for us to either, rewrite the app in Flink, or do some
parts in Beam and others in Flink.

Dunno ;)


On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <> wrote:

> You could but would need to change how the translation of an Apache Beam
> pipeline is done within Flink
> (and possibly it's superclass). You would need to take care of how
> stream.key().orderByTime() affects windowing and triggering behavior with
> regards to Apache Beam and may need to be quite well versed in the Apache
> Flink execution model and Apache Beam model.
> You might as well as use Flink instead of writing an Apache Beam pipeline.
> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <>
> wrote:
>> Thanks Lukasz.
>> As a Hypothetical question (to anyone more familiar with the runner than
>> I):
>> If one is using the Flink runner, would it theoretically be possible to
>> modify the runner so that it constructs its pipelines as:
>> stream.keyBy(...).orderByTime()
>> <>
>> .
>> Unless I'm missing something, this could be a cheeky way (bypassing the
>> Beam API) to enforce stricter ordering (with all the ensuing costs) on a
>> runner that would appear to support the desired semantics?
>> Stephan
>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <> wrote:
>>> The issue with Apache Beam and many stream processing systems is that
>>> they don't support back edges since they produce loops which may or may not
>>> be able to be resolved.
>>> One way to work around this is to emit your data to a pubsub system like
>>> Kafka or GCP Pubsub and read that as a source within your pipeline in
>>> addition to your normal source.
>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>> current aggregation + past sum  --> OriginalSink
>>> BackEdgeSource -------------------------/
>>>                          \-- BackEdgeSink
>>> This allows for both the OriginalSource and BackEdgeSource to maintain
>>> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
>>> the same key and value type) to produce output based upon the windowing
>>> strategy you define (no manual state/timers management !). Its important
>>> that the BackEdgeSource's elements timestamps that are emitted are for the
>>> window it will be used in, so for a 5 min aggregation window if the current
>>> sum is at time X, you should emit it at X+5 to become the past sum for the
>>> next window. Similarly it would be wise to use triggers which only fire
>>> once (so no speculative or late firings).
>>> If you don't want to use the solution above, then using Global windows
>>> and stateful DoFn makes the most sense based upon what you described using "track
>>> the past X windows and eventually decides: OK I'm as consistent as I am
>>> willing to be, Re-order the events, emit results, purge the "complete set"
>>> and ignore any further late (due to internal beam processing rather than
>>> late data). Note that stateful DoFn is partitioned by key and window so if
>>> you use 5 minute windows you'll only see the state that that was recorded
>>> for that 5 mins which is why its important to use the global window, also
>>> note that its easy to have "leaks" in stateful DoFns since the watermark
>>> will never surpass the point in time when the state data would be garbage
>>> collected.
>>> You might have better success with a simpler application though, it all
>>> depends on what the rest of the system looks like and what your integrating
>>> with.
>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <>
>>> wrote:
>>>> Hi
>>>> We are trying to implement a scenario that requires a rigid order of
>>>> events arriving into a Global Aggregation.
>>>> To protect the innocent, I've simplified and modified the exact
>>>> scenario into something like the following:
>>>>    1. I want to calculate a Client's all time Bank balance for example.
>>>>    2. I would like to at fixed intervals, emit this all time Balance,
>>>>    and join it onto other Fixed window aggregates.
>>>> In principle something like this:
>>>> [image: image.png]
>>>> Now the problem is, that it would seem that the order in which the
>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>> reading/experimenting).
>>>> This means that something like the following could potentially occur:
>>>> [image: image.png]
>>>> Essentially, even though the events/fixed windows are ordered, we can
>>>> at any given point in time omit an incorrect balance for the particular
>>>> person. (Even though our underlying events are ordered, and no late data
>>>> can arrive).
>>>> Given our current understanding  of how the compute gets parallelised
>>>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>>> Unfortunately, we do require a globally correct balance every time we
>>>> omit the value.
>>>> Are we missing something?
>>>> Does one need to sacrifice this consistency if using Beam?
>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>> ignore any further late (due to internal beam processing rather than late
>>>> data) and move along?
>>>> Or is there something else we can do? (,Tweak some options :) or Move
>>>> this calc into another streaming framework, with stricter guarantees for
>>>> example).
>>>> It seams like such an obvious use case at first glance, but learning
>>>> more about Beam, seems to indicate that this is not actually a use case
>>>> that it is well suited for?
>>>> Stephan

View raw message