flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
Date Mon, 14 Nov 2016 06:59:59 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662985#comment-15662985
] 

ASF GitHub Bot commented on FLINK-5048:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2789#discussion_r87738353
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
---
    @@ -143,133 +123,26 @@ public Kafka09Fetcher(
     
     	@Override
     	public void runFetchLoop() throws Exception {
    --- End diff --
    
    We will be throwing all exceptions, even if it's a `Handover.ClosedException`, correct?
    
    I wonder if it makes sense to suppress `Handover.ClosedException`s to not reach the main
task thread, and only restore the interruption state that follows `cancel()`? So basically,
we catch `InterruptedException` on the whole `runFetchLoop()` scope.
    
    This was what the exception passing behaviour was like before. Before, when `cancel()`
was called on the fetcher, we won't be throwing any other exceptions, only restoring the interruption
state to the main task thread.


> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5048
>                 URL: https://issues.apache.org/jira/browse/FLINK-5048
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that operates the KafkaConsumer.
That thread is shielded from interrupts, because the Kafka Consumer has not been handling
thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the network
stack (backpressure) or in chained operators. The later case leads to situations where cancellations
get very slow unless that thread would be interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its pulled batch
of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the blocking queue
and emit the records.
> This allows actually for some additional I/O overlay while limiting the additional memory
consumption - only two batches are ever held, one being fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message