Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BD71D200C1D for ; Thu, 16 Feb 2017 15:41:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BC162160B61; Thu, 16 Feb 2017 14:41:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 12E9A160B57 for ; Thu, 16 Feb 2017 15:41:46 +0100 (CET) Received: (qmail 80796 invoked by uid 500); 16 Feb 2017 14:41:45 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 80650 invoked by uid 99); 16 Feb 2017 14:41:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2017 14:41:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DB961C2A8F for ; Thu, 16 Feb 2017 14:41:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.999 X-Spam-Level: X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id qubTr8G0Fzwy for ; Thu, 16 Feb 2017 14:41:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id CCEA360F6C for ; Thu, 16 Feb 2017 14:41:43 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id B8B34E07D6 for ; Thu, 16 Feb 2017 14:41:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id D777E24130 for ; Thu, 16 Feb 2017 14:41:41 +0000 (UTC) Date: Thu, 16 Feb 2017 14:41:41 +0000 (UTC) From: "huxi (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-4767) KafkaProducer is not joining its IO thread properly MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 16 Feb 2017 14:41:47 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4767?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1587= 0055#comment-15870055 ]=20 huxi commented on KAFKA-4767: ----------------------------- What do you mean by "leaking the IO thread"? Do you mean it could not be sh= ut down successfully after interrupting the user thread in which KafkaProdu= cer.close was invoked? This should be not gonna happen since this.sender.i= nitiateClose() would always be run even when you interrupt the user thread.= =20 In my opinion, interrupting the user thread is no different from invoking i= oThread.join with a relatively small timeout because there is still a chanc= e to force close the IO thread and wait it again. That's also why we swallo= w InterruptedException during the first join.=20 Does it look good to you though? And for sake of the curiosity, did you enc= ounter any cases where IO thread got failed to be shut down? > KafkaProducer is not joining its IO thread properly > --------------------------------------------------- > > Key: KAFKA-4767 > URL: https://issues.apache.org/jira/browse/KAFKA-4767 > Project: Kafka > Issue Type: Bug > Components: producer=20 > Affects Versions: 0.11.0.0 > Reporter: Bu=C4=9Fra Gedik > Priority: Minor > > The {{KafkaProducer}} is not properly joining the thread it creates. The = code is like this: > {code} > try { > this.ioThread.join(timeUnit.toMillis(timeout)); > } catch (InterruptedException t) { > firstException.compareAndSet(null, t); > log.error("Interrupted while joining ioThread", t); > } > {code} > If the code is interrupted while performing the join, it will end up leav= ing the io thread running. The correct way of handling this is a follows: > {code} > try { > this.ioThread.join(timeUnit.toMillis(timeout)); > } catch (InterruptedException t) { > // propagate the interrupt > this.ioThread.interrupt(); > try {=20 > this.ioThread.join(); > } catch (InterruptedException t) { > firstException.compareAndSet(null, t); > log.error("Interrupted while joining ioThread", t); > } finally { > // make sure we maintain the interrupted status > Thread.currentThread.interrupt(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)