flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks
Date Wed, 25 Oct 2017 09:14:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218276#comment-16218276

ASF GitHub Bot commented on FLINK-7666:

GitHub user kl0u opened a pull request:


    [FLINK-7666] Close TimeService after closing operators.

    R @aljoscha 
    **(The sections below can be removed for hotfixes of typos)**
    ## What is the purpose of the change
    *Breaks the `quiesceAndWait` of the `TimeService` into 2 methods, `quiesce` and `wait`,
and calls them **after** closing the operators. The `quiesce()` is called while having the
`checkpointLock`, while the `wait()` no.
    The original problem was that the StreamTask was calling the `quiesceAndAwaitPending()`
of the
    `TimerService` before the close() of the operator. In the case  of the continuous file
reading process, this meant that with a periodic watermark emitter and a small file (e.g.
one split), the timer service would be closed before even starting to read (as soon as the
reader received the first split), and no timers would be registered to emit watermarks.
    ## Verifying this change
    Added test in the `OneInputStreamTaskTest`.
    ## Documentation
      - Does this pull request introduce a new feature? NO

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink fs-reader

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4900


> ContinuousFileReaderOperator swallows chained watermarks
> --------------------------------------------------------
>                 Key: FLINK-7666
>                 URL: https://issues.apache.org/jira/browse/FLINK-7666
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>            Reporter: Ufuk Celebi
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.4.0
> I use event time and read from a (finite) file. I assign watermarks right after the {{ContinuousFileReaderOperator}}
with parallelism 1.
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}
> The watermarks I assign never progress through the pipeline.
> I can work around this by inserting a {{shuffle()}} after the file reader or starting
a new chain at the assigner:
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .shuffle() 
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}

This message was sent by Atlassian JIRA

View raw message