flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
Date Mon, 30 Oct 2017 15:05:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225085#comment-16225085

ASF GitHub Bot commented on FLINK-7949:

GitHub user bartektartanus opened a pull request:


    [FLINK-7949] AsyncWaitOperator is not restarting when queue is full

    Emitter thread is started BEFORE filling up the queue of recovered elements
    Issue description:
    During process restart, if the queue was full (with N elements) and there was pending
element waiting to be added to the queue, then the queue couldn't fit N+1 elements and thread
was blocked forever. As Till Rohrmann suggested here:
    I've changed the order of this code to start emitter thread earlier.   

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bartektartanus/flink master

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4924
commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c
Author: Bartłomiej Tartanus <bartektartanus@gmail.com>
Date:   2017-10-30T14:39:43Z

    start emmiter thread BEFORE filling up the queue of recovered elements


> AsyncWaitOperator is not restarting when queue is full
> ------------------------------------------------------
>                 Key: FLINK-7949
>                 URL: https://issues.apache.org/jira/browse/FLINK-7949
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.2
>            Reporter: Bartłomiej Tartanus
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 

This message was sent by Atlassian JIRA

View raw message