kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "huxi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4767) KafkaProducer is not joining its IO thread properly
Date Thu, 16 Feb 2017 14:41:41 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870055#comment-15870055
] 

huxi commented on KAFKA-4767:
-----------------------------

What do you mean by "leaking the IO thread"? Do you mean it could not be shut down successfully
after interrupting the user thread in which KafkaProducer.close was invoked?  This should
be not gonna happen since this.sender.initiateClose() would always be run even when you interrupt
the user thread. 

In my opinion, interrupting the user thread is no different from invoking ioThread.join with
a relatively small timeout because there is still a chance to force close the IO thread and
wait it again. That's also why we swallow InterruptedException during the first join. 

Does it look good to you though? And for sake of the curiosity, did you encounter any cases
where IO thread got failed to be shut down?

> KafkaProducer is not joining its IO thread properly
> ---------------------------------------------------
>
>                 Key: KAFKA-4767
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4767
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.11.0.0
>            Reporter: Bu─čra Gedik
>            Priority: Minor
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code is like
this:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     firstException.compareAndSet(null, t);
>     log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving the io thread
running. The correct way of handling this is a follows:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     // propagate the interrupt
>     this.ioThread.interrupt();
>     try { 
>          this.ioThread.join();
>     } catch (InterruptedException t) {
>         firstException.compareAndSet(null, t);
>         log.error("Interrupted while joining ioThread", t);
>     } finally {
>         // make sure we maintain the interrupted status
>         Thread.currentThread.interrupt();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message