beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Bradshaw <>
Subject Re: Global Window - Correctness/Completeness
Date Fri, 25 May 2018 01:00:01 GMT
This is an interesting question.

>From a model perspective, one way to represent this is that you're trying
to group all your events into windows with the custom windowing function

    t -> { [-infinity, k*5min : k*5min < t] }

where k varies over all integers. You would then do a per-key sum over
these values. Unfortunately, no Beam runner supports infinite window sets
such as these...

As mentioned, you could do this with a stateful DoFn and timers in the
global window. When a the partial sum at time T arrives, store it to your
state, and set a timer for T. When this timer fires, you know you'll have
received all partial sums up to T (plus possibly some after T), and you can
emit (and store) the cumulative sum(s) at up to and including T and purge
the buffer.

On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <> wrote:

> I don't think you need to keep last 'n' windows with Stateful DoFn in
> order to reorder. You mentioned your source is ordered, which implies it
> can effectively have a serial id (global or per key). With each 5 minute
> aggregation you also include the serial-id range. Inside the StatefulDoFn,
> you can easily decide if you can immediately emit (in the normal case of
> time order, since window size is large), or if you need to buffer.
> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <>
> wrote:
>> Indeed, and very true :)
>> More a flight of fancy, than anything else.
>> Trying hard to find the right tool for the job, but something like this
>> could potentially (risks aside for this conversation as it is potentially a
>> hairy implementation with more effort and risk than worth the reward)
>> reduce the need for us to either, rewrite the app in Flink, or do some
>> parts in Beam and others in Flink.
>> Dunno ;)
>> Stephan
>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <> wrote:
>>> You could but would need to change how the translation of an Apache Beam
>>> pipeline is done within Flink
>>> (and possibly it's superclass). You would need to take care of how
>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>> Flink execution model and Apache Beam model.
>>> You might as well as use Flink instead of writing an Apache Beam
>>> pipeline.
>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>>> wrote:
>>>> Thanks Lukasz.
>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>> than I):
>>>> If one is using the Flink runner, would it theoretically be possible to
>>>> modify the runner so that it constructs its pipelines as:
>>>> stream.keyBy(...).orderByTime()
>>>> <>
>>>> .
>>>> Unless I'm missing something, this could be a cheeky way (bypassing the
>>>> Beam API) to enforce stricter ordering (with all the ensuing costs) on a
>>>> runner that would appear to support the desired semantics?
>>>> Stephan
>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <> wrote:
>>>>> The issue with Apache Beam and many stream processing systems is that
>>>>> they don't support back edges since they produce loops which may or may
>>>>> be able to be resolved.
>>>>> One way to work around this is to emit your data to a pubsub system
>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline
>>>>> addition to your normal source.
>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current
sum =
>>>>> current aggregation + past sum  --> OriginalSink
>>>>> BackEdgeSource -------------------------/
>>>>>                            \-- BackEdgeSink
>>>>> This allows for both the OriginalSource and BackEdgeSource to maintain
>>>>> their own watermarks and for the CoGBK (or Flatten + GBK if the input
>>>>> the same key and value type) to produce output based upon the windowing
>>>>> strategy you define (no manual state/timers management !). Its important
>>>>> that the BackEdgeSource's elements timestamps that are emitted are for
>>>>> window it will be used in, so for a 5 min aggregation window if the current
>>>>> sum is at time X, you should emit it at X+5 to become the past sum for
>>>>> next window. Similarly it would be wise to use triggers which only fire
>>>>> once (so no speculative or late firings).
>>>>> If you don't want to use the solution above, then using Global windows
>>>>> and stateful DoFn makes the most sense based upon what you described
using "track
>>>>> the past X windows and eventually decides: OK I'm as consistent as I
>>>>> willing to be, Re-order the events, emit results, purge the "complete
>>>>> and ignore any further late (due to internal beam processing rather than
>>>>> late data). Note that stateful DoFn is partitioned by key and window
so if
>>>>> you use 5 minute windows you'll only see the state that that was recorded
>>>>> for that 5 mins which is why its important to use the global window,
>>>>> note that its easy to have "leaks" in stateful DoFns since the watermark
>>>>> will never surpass the point in time when the state data would be garbage
>>>>> collected.
>>>>> You might have better success with a simpler application though, it
>>>>> all depends on what the rest of the system looks like and what your
>>>>> integrating with.
>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>>> wrote:
>>>>>> Hi
>>>>>> We are trying to implement a scenario that requires a rigid order
>>>>>> events arriving into a Global Aggregation.
>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>> scenario into something like the following:
>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>    example.
>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>> In principle something like this:
>>>>>> [image: image.png]
>>>>>> Now the problem is, that it would seem that the order in which the
>>>>>> fixed windows arrive in the Global window is not Guaranteed (given
>>>>>> reading/experimenting).
>>>>>> This means that something like the following could potentially occur:
>>>>>> [image: image.png]
>>>>>> Essentially, even though the events/fixed windows are ordered, we
>>>>>> at any given point in time omit an incorrect balance for the particular
>>>>>> person. (Even though our underlying events are ordered, and no late
>>>>>> can arrive).
>>>>>> Given our current understanding  of how the compute gets parallelised
>>>>>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>>>>> Unfortunately, we do require a globally correct balance every time
>>>>>> omit the value.
>>>>>> Are we missing something?
>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>>>> past X windows and eventually decides: OK I'm as consistent as I
am willing
>>>>>> to be, Re-order the events, emit results, purge the "complete set"
>>>>>> ignore any further late (due to internal beam processing rather than
>>>>>> data) and move along?
>>>>>> Or is there something else we can do? (,Tweak some options :) or
>>>>>> this calc into another streaming framework, with stricter guarantees
>>>>>> example).
>>>>>> It seams like such an obvious use case at first glance, but learning
>>>>>> more about Beam, seems to indicate that this is not actually a use
>>>>>> that it is well suited for?
>>>>>> Stephan

View raw message