beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
Date Fri, 30 Jun 2017 09:50:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069802#comment-16069802
] 

Aljoscha Krettek commented on BEAM-2140:
----------------------------------------

Yep, in the Flink Runner the processing path for {{ProcessFn}} contains {{StatefulDoFnRunner}},
that's why timers were dropped once the input watermark went to +Inf. I fixed this in the
branch I posted earlier by changing {{SplittableDoFnOperator}} to not use that code path anymore
but instead use completely custom code for processing a splittable DoFn: https://github.com/apache/beam/blob/10b1b598100541ff37734a04850ada45fc362b99/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L73-L73.
This fixed the problem of dropped processing-time timers.

The other problem (in the Flink Runner) was that processing-time timers are simply dropped
if the pipeline is shutting down. I'm getting around this by setting a "last resort" event-time
timer that fires when the watermark goes to +Inf. There I'm processing the remaining restrictions
until they're exhausted. Splittable DoFn processing in the {{SplittableDoFnOperator}} is now
split (hehe) into three methods:
 * {{processElement()}}: seed state and process restriction once, set next processing-time
timer and set last resort event-time timer
 * {{onProcessingTime()}}: process restriction once and set next processing-time timer, cleanup
all state if restriction is exhausted
 * {{onEventTime()}}: process restriction until exhausted, cleanup all state

There is no way of getting around Flink dropping processing-time timers so if we want to get
the Flink Runner to directly use {{ProcessFn}} we should add this "last resort" timer there
as well. I think it makes sense to have this in general anyways. [~jkff] what do you think
about this?

Regarding state leakage: AFAIK a splittable DoFn is not allowed to have any custom state or
timers, right? And {{ProcessFn}} makes sure to cleanup the element state and restriction state
when a restriction is exhausted so there should be no state leakage, right?

[~jkff] You mentioned that "the input watermark is held by "pending elements"". Is this true?
I thought that only the output watermark is held by pending elements.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled the tests
to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message