Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2D524200D4E for ; Thu, 7 Dec 2017 14:23:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2C1DB160C0C; Thu, 7 Dec 2017 13:23:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 70E6F160BFE for ; Thu, 7 Dec 2017 14:23:05 +0100 (CET) Received: (qmail 56921 invoked by uid 500); 7 Dec 2017 13:23:04 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 56912 invoked by uid 99); 7 Dec 2017 13:23:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Dec 2017 13:23:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D01871A0A10 for ; Thu, 7 Dec 2017 13:23:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id optri0aJP75s for ; Thu, 7 Dec 2017 13:23:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 37F4B5FB54 for ; Thu, 7 Dec 2017 13:23:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 5DA5BE0E56 for ; Thu, 7 Dec 2017 13:23:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 15C6D255C9 for ; Thu, 7 Dec 2017 13:23:00 +0000 (UTC) Date: Thu, 7 Dec 2017 13:23:00 +0000 (UTC) From: "dongtingting (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException" MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 07 Dec 2017 13:23:06 -0000 [ https://issues.apache.org/jira/browse/FLINK-8093?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1628= 1834#comment-16281834 ]=20 dongtingting commented on FLINK-8093: ------------------------------------- [~aljoscha] I think clientid is thread safe and static. But one taskmanager= may have multi slots, different slots use different environment and kafkaP= roducer classes. So one taskmanager may have multi same clientid, but metr= ics will register sun.jmx.mbeanserver which is one in one jvm . Then multi = same clientid conflict while register into one sun.jmx.mbeanserver.=20 We fix this problem by user code set: properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-" + topic= + timestamp); This can avoid conflict=E3=80=82=20 In addition we want to modify flink code to avoid conflict further=E3=80= =82=20 > flink job fail because of kafka producer create fail of "javax.management= .InstanceAlreadyExistsException" > -------------------------------------------------------------------------= -------------------------------- > > Key: FLINK-8093 > URL: https://issues.apache.org/jira/browse/FLINK-8093 > Project: Flink > Issue Type: Bug > Affects Versions: 1.3.2 > Environment: flink 1.3.2, kafka 0.9.1 > Reporter: dongtingting > Priority: Critical > > one taskmanager has multiple taskslot, one task fail because of create ka= fkaProducer fail=EF=BC=8Cthe reason for create kafkaProducer fail is =E2=80= =9Cjavax.management.InstanceAlreadyExistsException: kafka.producer:type=3Dp= roducer-metrics,client-id=3Dproducer-3=E2=80=9D=E3=80=82 the detail trace i= s =EF=BC=9A > 2017-11-04 19:41:23,281 INFO org.apache.flink.runtime.taskmanager.Task = - Source: Custom Source -> Filter -> Map -> Filter -> Si= nk: dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched fro= m RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Failed to construct kafka produce= r > at org.apache.kafka.clients.producer.KafkaProducer.(KafkaPr= oducer.java:321) > at org.apache.kafka.clients.producer.KafkaProducer.(KafkaPr= oducer.java:181) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer= Base.getKafkaProducer(FlinkKafkaProducerBase.java:202) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer= Base.open(FlinkKafkaProducerBase.java:212) > at org.apache.flink.api.common.functions.util.FunctionUtils.openF= unction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOper= ator.open(AbstractUdfStreamOperator.java:111) > at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOpe= rators(StreamTask.java:375) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Str= eamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.KafkaException: Error registering mbea= n kafka.producer:type=3Dproducer-metrics,client-id=3Dproducer-3 > at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxRepo= rter.java:159) > at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxRe= porter.java:77) > at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics= .java:288) > at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java= :255) > at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java= :239) > at org.apache.kafka.clients.producer.internals.RecordAccumulator.= registerMetrics(RecordAccumulator.java:137) > at org.apache.kafka.clients.producer.internals.RecordAccumulator.= (RecordAccumulator.java:111) > at org.apache.kafka.clients.producer.KafkaProducer.(KafkaPr= oducer.java:261) > ... 9 more > Caused by: javax.management.InstanceAlreadyExistsException: kafka.produce= r:type=3Dproducer-metrics,client-id=3Dproducer-3 > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:43= 7) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.register= WithRepository(DefaultMBeanServerInterceptor.java:1898) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.register= DynamicMBean(DefaultMBeanServerInterceptor.java:966) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.register= Object(DefaultMBeanServerInterceptor.java:900) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.register= MBean(DefaultMBeanServerInterceptor.java:324) > at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanS= erver.java:522) > at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxRepo= rter.java:157) > ... 16 more > I doubt that task in different taskslot of one taskmanager use different = classloader=EF=BC=8C and taskid may be the same in one process=E3=80=82 So= this lead to create kafkaProducer fail in one taskManager=E3=80=82=20 > Does anybody encountered the same problem=EF=BC=9F=20 -- This message was sent by Atlassian JIRA (v6.4.14#64029)