beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <>
Subject [jira] [Commented] (BEAM-2402) Support AfterPane.elementGapAtMost() trigger and its combination with elementCountAtLeast()
Date Fri, 02 Jun 2017 03:51:04 GMT


Kenneth Knowles commented on BEAM-2402:

Reading your code to understand this, it seems the goal of the trigger is:

1. Fire if some element count is exceeded.
2. Fire if the processing time between two element arrivals is a certain gap.

So I have a couple high level comments:

* The code you shared will fire only when an element comes in, so it will be higher latency
than a timer. I understand you did it because timers were not performant.
* Can you still combine these two criteria with {{AfterFirst.of}} ?
* The other issue is that {{shouldFire}} should be strictly a function of current state, not
instance fields. It seems you did this to avoid saving data to state? But state can/should
be cached in memory until commit, which means the "WARN" bit that you commented on should
be redundant. You could achieve something similar but more robust (still kind of a hack) by
setting {{shouldFire}} to a {{ValueState<Boolean>}} and just reading it. We want this
sort of thing to be not necessary, since it is basically the same as the simper way of just
reading the current situation to decide whether to fire.

And if/when you propose a PR, please also include the SDK side concrete syntax for the trigger.

Perhaps the timer scan you refer to can be addressed at the Flink level - what do you think,

> Support AfterPane.elementGapAtMost() trigger and its combination with elementCountAtLeast()
> -------------------------------------------------------------------------------------------
>                 Key: BEAM-2402
>                 URL:
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Pei He
>            Assignee: Pei He
> We need a timestamp-driven trigger to use as a cheaper (or more efficient) version of
the ProcessingTime trigger.
> The problem of using ProcessingTime trigger is that current runners' supports are not
very efficient, and couldn't work for pipelines that have lots of keys (for example, flink
runner will scan timers for all keys when watermark advance).
> We have used AfterPane.elementGapAtMost() trigger in our production, and want to merge
it back. And, we believe it could be the solution for people who have the similar issue.
> Implementation for reference:

This message was sent by Atlassian JIRA

View raw message