flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-5638) Deadlock when closing two chained async I/O operators
Date Wed, 25 Jan 2017 11:03:26 GMT
Till Rohrmann created FLINK-5638:

             Summary: Deadlock when closing two chained async I/O operators
                 Key: FLINK-5638
                 URL: https://issues.apache.org/jira/browse/FLINK-5638
             Project: Flink
          Issue Type: Bug
          Components: Local Runtime
    Affects Versions: 1.2.0, 1.3.0
            Reporter: Till Rohrmann
            Assignee: Till Rohrmann
             Fix For: 1.3.0, 1.2.1

The {{AsyncWaitOperator}} can deadlock in a special cases when closing two chained {{AsyncWaitOperator}}
while there is still one element between these two operators in flight.

The deadlock scenario is the following: Given two chained {{AsyncWaitOperators}} {{a1}} and
{{a2}}. {{a1}} has its last element completed. This notifies {{a1's}} {{Emitter}}, {{e1}},
to remove the element from the queue and output it to {{a2}}. This poll and output operation
happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} thread
will directly call {{a2's}} {{processElement}} function. In this function, we try to add the
new element to the {{StreamElementQueue}}. Now assume that this queue is full. Then the operation
will release the checkpoint lock and wait until it is notified again.

In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we have consumed
all input. The close operation also happens under the checkpoint lock. First the close method
waits until all elements from the {{StreamElementQueue}} have been processed (== empty). This
happens by waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}.
When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the closing operation
does not release the checkpoint lock, {{e1}} cannot regain the synchronization lock and voila
we have a deadlock.

There are two problems which cause the problem:

1. We assume that the {{AsyncWaitOperator}} has processed all its elements if the queue is
empty. This is usually the case if the output operation is atomic. However in the chained
case it can happen that the emitter thread has to wait to insert the element into the queue
of the next {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint lock
and, thus, the output operation is no longer atomic. We can solve this problem by polling
the last queue element after we have outputted it instead of before.

2. We interrupt the emitter thread while holding the checkpoint lock and not freeing it again.
Under these circumstances, the interrupt signal is meaningless because the emitter thread
also needs control over the checkpoint lock. We should solve the problem by waiting on the
checkpoint lock and periodically checking whether the thread has already stopped or not.

This message was sent by Atlassian JIRA

View raw message