beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
Date Tue, 09 May 2017 09:38:04 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16002370#comment-16002370
] 

Aljoscha Krettek commented on BEAM-2140:
----------------------------------------

I think this is mostly right, yes. 😃

1. Why should it not wrap {{StatefulDoFnRunner}}? I think it's the easiest way to get timers
and state that expires.
2. Yes, I think this is a problem.
3. Also a problem, so when we shut down we should check whether we have any outstanding processing-time
timers and process them.

I'm assuming you read the little analysis I did on Aviem's PR, pasting just in case:
{quote}
@jkff SplittableDoFnTest.testOutputAfterCheckpoint() fails on the Flink Runner. If you instrument
the test (with a little sysout printing in a ParDo) you see that the SDF only emits elements
up to 12344. If, in BoundedSourceWrapper (which executes the Create.of("foo") on Flink), you
replace the end of the run() method by
{code}
    // emit final Long.MAX_VALUE watermark, just to be sure
    // ctx.emitWatermark(new Watermark(Long.MAX_VALUE));

    while (isRunning) {
      Thread.sleep(500);
    }
{code}
you see all values correctly emitted but the test never stops because the source is now unbounded.
(Interestingly, if you replace that one line by ctx.emitWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
you get values up to 34567)

The problem seems to be that the code running the SDF sees the high watermarks and then doesn't
emit stuff anymore. Do you have an idea what could be going on there? If not we'll probably
have to delve deeper into the code.
{quote}

So fiddling with the watermarks in {{BoundedSourceWrapper}} also seems to affect what is getting
processed/emitted by the splittable DoFn (SDF).

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled the tests
to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message