beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stas Levin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
Date Thu, 07 Sep 2017 17:18:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Stas Levin updated BEAM-2859:
-----------------------------
    Description: 
{{AfterProcessingTime}} based timers are not fired when the input watermark does not advance,
preventing from buffered element to be emitted.

The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what
triggers are ready to be processed by using the following condition: 

{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}

However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark
should *NOT* have effect.

In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are
deemed eligible for processing (but will not necessarily fire). 
This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}}
in particular, since they should remain scheduled until the corresponding window expires and
all state is cleared.
For instance, consider a timer that is found eligible for processing and is thus deleted,
then it just so happens to be that its {{shouldFire()}} returns {{false}} and it needs to
be re-run next time around, but won't since it's been deleted.

It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}}
and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context
to determine whether it's time for a given timer to deleted.

  was:
{{AfterProcessingTime}} based timers are not fired when the input watermark does not advance,
preventing from buffered element to be emitted.

The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what
triggers are ready to be processed by using the following condition: 

{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}

However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark
should *NOT* have effect.

In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are
deemed eligible for processing (but will not necessarily fire). 
This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}}
in particular, since they should remain scheduled until the corresponding window expires and
all state is cleared.
For instance, consider a timer that is found eligible for processing is thus deleted, then
it just so happens to be that its {{shouldFire()}} returns {{false}} and it needs to be re-run
next time around, but won't since it's been deleted.

It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}}
and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context
to determine whether it's time for a given timer to deleted.


> Processing time based timers are not properly fired in case the watermark stays put
> -----------------------------------------------------------------------------------
>
>                 Key: BEAM-2859
>                 URL: https://issues.apache.org/jira/browse/BEAM-2859
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Stas Levin
>            Assignee: Stas Levin
>
> {{AfterProcessingTime}} based timers are not fired when the input watermark does not
advance, preventing from buffered element to be emitted.
> The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines
what triggers are ready to be processed by using the following condition: 
> {code:java}
> timer.getTimestamp().isBefore(inputWatermark)
> {code}
> However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input
watermark should *NOT* have effect.
> In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they
are deemed eligible for processing (but will not necessarily fire). 
> This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}}
in particular, since they should remain scheduled until the corresponding window expires and
all state is cleared.
> For instance, consider a timer that is found eligible for processing and is thus deleted,
then it just so happens to be that its {{shouldFire()}} returns {{false}} and it needs to
be re-run next time around, but won't since it's been deleted.
> It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}}
and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context
to determine whether it's time for a given timer to deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message