beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reza Rokni <>
Subject Re: Global Window - Correctness/Completeness
Date Fri, 25 May 2018 02:33:46 GMT

Some code that I am working on but not yet fully tested which might be

In the Dataflow runner timers will fire in order... So the code that I am
playing with to solve a similar type of use case (if I have understood your
use case correctly...)

When you enter the Keyed State on the Global Window , set a timer that will
fire in the next Fixed window (lower boundary + x).
Attach all accum objects to a List<Accum> in the Keyed State.

OnTimer fire()
 Read all elements in list
Output results...

Hope to blog about this soon if the testing doesnt find any issues...

On 25 May 2018 at 09:00, Robert Bradshaw <> wrote:

> This is an interesting question.
> From a model perspective, one way to represent this is that you're trying
> to group all your events into windows with the custom windowing function
>     t -> { [-infinity, k*5min : k*5min < t] }
> where k varies over all integers. You would then do a per-key sum over
> these values. Unfortunately, no Beam runner supports infinite window sets
> such as these...
> As mentioned, you could do this with a stateful DoFn and timers in the
> global window. When a the partial sum at time T arrives, store it to your
> state, and set a timer for T. When this timer fires, you know you'll have
> received all partial sums up to T (plus possibly some after T), and you can
> emit (and store) the cumulative sum(s) at up to and including T and purge
> the buffer.
> On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <> wrote:
>> I don't think you need to keep last 'n' windows with Stateful DoFn in
>> order to reorder. You mentioned your source is ordered, which implies it
>> can effectively have a serial id (global or per key). With each 5 minute
>> aggregation you also include the serial-id range. Inside the StatefulDoFn,
>> you can easily decide if you can immediately emit (in the normal case of
>> time order, since window size is large), or if you need to buffer.
>> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <>
>> wrote:
>>> Indeed, and very true :)
>>> More a flight of fancy, than anything else.
>>> Trying hard to find the right tool for the job, but something like this
>>> could potentially (risks aside for this conversation as it is potentially a
>>> hairy implementation with more effort and risk than worth the reward)
>>> reduce the need for us to either, rewrite the app in Flink, or do some
>>> parts in Beam and others in Flink.
>>> Dunno ;)
>>> Stephan
>>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <> wrote:
>>>> You could but would need to change how the translation of an Apache
>>>> Beam pipeline is done within Flink
>>>> 8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/
>>>> main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslat
>>>> (and possibly it's superclass). You would need to take care of how
>>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>>> Flink execution model and Apache Beam model.
>>>> You might as well as use Flink instead of writing an Apache Beam
>>>> pipeline.
>>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>>>> wrote:
>>>>> Thanks Lukasz.
>>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>>> than I):
>>>>> If one is using the Flink runner, would it theoretically be possible
>>>>> to modify the runner so that it constructs its pipelines as:
>>>>> stream.keyBy(...).orderByTime()
>>>>> <>
>>>>> .
>>>>> Unless I'm missing something, this could be a cheeky way (bypassing
>>>>> the Beam API) to enforce stricter ordering (with all the ensuing costs)
>>>>> a runner that would appear to support the desired semantics?
>>>>> and+Order+in+Streams
>>>>> Stephan
>>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <>
>>>>>> The issue with Apache Beam and many stream processing systems is
>>>>>> they don't support back edges since they produce loops which may
or may not
>>>>>> be able to be resolved.
>>>>>> One way to work around this is to emit your data to a pubsub system
>>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline
>>>>>> addition to your normal source.
>>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current
sum =
>>>>>> current aggregation + past sum  --> OriginalSink
>>>>>> BackEdgeSource -------------------------/
>>>>>>                              \-- BackEdgeSink
>>>>>> This allows for both the OriginalSource and BackEdgeSource to
>>>>>> maintain their own watermarks and for the CoGBK (or Flatten + GBK
if the
>>>>>> input has the same key and value type) to produce output based upon
>>>>>> windowing strategy you define (no manual state/timers management
!). Its
>>>>>> important that the BackEdgeSource's elements timestamps that are
>>>>>> are for the window it will be used in, so for a 5 min aggregation
window if
>>>>>> the current sum is at time X, you should emit it at X+5 to become
the past
>>>>>> sum for the next window. Similarly it would be wise to use triggers
>>>>>> only fire once (so no speculative or late firings).
>>>>>> If you don't want to use the solution above, then using Global
>>>>>> windows and stateful DoFn makes the most sense based upon what you
>>>>>> described using "track the past X windows and eventually decides:
>>>>>> I'm as consistent as I am willing to be, Re-order the events, emit
>>>>>> purge the "complete set" and ignore any further late (due to internal
>>>>>> processing rather than late data). Note that stateful DoFn is partitioned
>>>>>> by key and window so if you use 5 minute windows you'll only see
the state
>>>>>> that that was recorded for that 5 mins which is why its important
to use
>>>>>> the global window, also note that its easy to have "leaks" in stateful
>>>>>> DoFns since the watermark will never surpass the point in time when
>>>>>> state data would be garbage collected.
>>>>>> You might have better success with a simpler application though,
>>>>>> all depends on what the rest of the system looks like and what your
>>>>>> integrating with.
>>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>>>> wrote:
>>>>>>> Hi
>>>>>>> We are trying to implement a scenario that requires a rigid order
>>>>>>> events arriving into a Global Aggregation.
>>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>>> scenario into something like the following:
>>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>>    example.
>>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>>> In principle something like this:
>>>>>>> [image: image.png]
>>>>>>> Now the problem is, that it would seem that the order in which
>>>>>>> fixed windows arrive in the Global window is not Guaranteed (given
>>>>>>> reading/experimenting).
>>>>>>> This means that something like the following could potentially
>>>>>>> [image: image.png]
>>>>>>> Essentially, even though the events/fixed windows are ordered,
>>>>>>> can at any given point in time omit an incorrect balance for
the particular
>>>>>>> person. (Even though our underlying events are ordered, and no
late data
>>>>>>> can arrive).
>>>>>>> Given our current understanding  of how the compute gets
>>>>>>> parallelised (which is how Beam Achieves its high throughput)
this appears
>>>>>>> unavoidable.
>>>>>>> Unfortunately, we do require a globally correct balance every
>>>>>>> we omit the value.
>>>>>>> Are we missing something?
>>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>>> Must one write some sort of a stateful DoFn that keeps track
of the
>>>>>>> past X windows and eventually decides: OK I'm as consistent as
I am willing
>>>>>>> to be, Re-order the events, emit results, purge the "complete
set" and
>>>>>>> ignore any further late (due to internal beam processing rather
than late
>>>>>>> data) and move along?
>>>>>>> Or is there something else we can do? (,Tweak some options :)
>>>>>>> Move this calc into another streaming framework, with stricter
>>>>>>> for example).
>>>>>>> It seams like such an obvious use case at first glance, but learning
>>>>>>> more about Beam, seems to indicate that this is not actually
a use case
>>>>>>> that it is well suited for?
>>>>>>> Stephan


This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

View raw message