From user-return-29989-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Sep 24 09:40:47 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3DA25180644 for ; Tue, 24 Sep 2019 11:40:47 +0200 (CEST) Received: (qmail 22247 invoked by uid 500); 24 Sep 2019 09:40:45 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 22232 invoked by uid 99); 24 Sep 2019 09:40:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Sep 2019 09:40:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8B6FAC1E80 for ; Tue, 24 Sep 2019 09:40:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.3 X-Spam-Level: ** X-Spam-Status: No, score=2.3 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 0r1IqAYJHMYr for ; Tue, 24 Sep 2019 09:40:41 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.221.195; helo=mail-vk1-f195.google.com; envelope-from=tony19920430@gmail.com; receiver= Received: from mail-vk1-f195.google.com (mail-vk1-f195.google.com [209.85.221.195]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 89608BC553 for ; Tue, 24 Sep 2019 09:40:41 +0000 (UTC) Received: by mail-vk1-f195.google.com with SMTP id 70so260479vkz.8 for ; Tue, 24 Sep 2019 02:40:41 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=WrbtmzTmFsqIYoWipdf/BJwz6JSwiC2re0pBIfgH+Io=; b=JphIpZiVTzlEfajWBOms5Z9C99oXOHjVDTrAN6JYs0QsYFG6PF+f3E7LHqK6vYDbL1 NiMngdZkTdhiXKCJ+LHUvHv6btoE75I23dtQ5hiahxvLH0tON0djpi2SDs4JLgpeUpa9 h7p62ZCjXcIiXGCymBP+bFCMTY/6F5WIZ9itZeeHwXkgZK+Y3ASZjBUEfmQTXqSsOoC7 t3sjihns6KX4O8gzqlOxvsyEVj43rH7ZQxsRmctP7Y5VRdMNIxg3sbiGJQRbbkQ8mvMI f1VIoJYajs2WRq+NaiN2l3ARO0Sw5kMyJiGT06z38vYJiQk/wnTKdLYC7GT/Lq1DcK+i hiAw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=WrbtmzTmFsqIYoWipdf/BJwz6JSwiC2re0pBIfgH+Io=; b=VuM+Z+gXY1ejfwyGPNXQcM4oOrka2U4WB6p47ix2OtCpWqp3oSEektv8kwV9ENqFud pS6SC4dZVbTdxdWvcUgUnjZKJ3pJ/cZUKE0uhkknpozStBNYS/LyH1Xcq3/slRaIBC6n l3KCOQaF+8xydNTmlTHDlUZ8KyNj5ghcH8/eC/hegjC53hPnG03gGoi9y4/J276a1GQM xiso8v3AC4My/FBccLy3sAmwMH10DhclBC8aqRJwCZBJivfxrQoagEZ005BOu3lH7GXw b6AD5T6LbnNsJGlyKpsxDfN049FuSjnRwfAmvCxYSyUGxM+McLKqvl8BVvSZEPPIwLVL mC+g== X-Gm-Message-State: APjAAAWYAC/xAxOWg1vaQaxD94o3Y4jZtvLBHVcpTThzyFfM8k1Gdd8H IGeP+WkqyIGNzGQOv6Kn5FjbPfTwByTSsvO3fjU= X-Google-Smtp-Source: APXvYqz+TYAgAr4VwIZ6pIsG+GZgLaNoQlYbneathDp5dDi1Dkn8qlf6v6saJpflyrLxzroI1AY8K1nPSBdoCauTnFU= X-Received: by 2002:a1f:a705:: with SMTP id q5mr962532vke.85.1569318040581; Tue, 24 Sep 2019 02:40:40 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Tony Wei Date: Tue, 24 Sep 2019 17:40:14 +0800 Message-ID: Subject: Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction To: Becket Qin Cc: Fabian Hueske , user Content-Type: multipart/alternative; boundary="000000000000593d2f0593495449" --000000000000593d2f0593495449 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Becket, I have read kafka source code and found that the error won't be propagated to client if the list of topic-partition is empty [1], because it bind the error with each topic-partition. If this list is empty, then that error won't be packaged into response body. That made the client didn't get the error message to find the newer coordinator. Back to this problem, I think the original design of kafka client might not prefer to execute `enqueueNewPartitions` if there is no added topic-partition. It might be a bug here, and we should first check if `newPartitionsInTransaction` list is empty before executing `enqueueNewPartitions` function. Am I right? If it can be confirmed as a bug, I would like to submit my patch to fix it. Thanks for your help. Best, Tony Wei [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server= /KafkaApis.scala#L2042 Tony Wei =E6=96=BC 2019=E5=B9=B49=E6=9C=8820=E6=97= =A5 =E9=80=B1=E4=BA=94 =E4=B8=8B=E5=8D=882:57=E5=AF=AB=E9=81=93=EF=BC=9A > Hi, > > I found that the source code [1] in kafka showed that it always check if > `newPartitionsInTransaction` > is empty before calling `enqueueRequest(addPartitionsToTransactionHandler= ())`, > that is not > applied to flink kafka producer code [2]. > > I wrote a simple producer with the `flushNewPartitions` copied from flink > kafka producer, and > successfully reproduce this exception. Then, I modified the logic in > `enqueueNewPartitions` to check > if there is any `newPartitionsInTransaction` before make this request. An= d > this would work well even > if I restarted the broker who owned this transaction's coordinator, since > the empty transaction won't > make any request to server. > > The attachments are my simple producer code. Please help to verify what I > thought is correct. Thanks. > > Best, > Tony Wei > > [1] > https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8= bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Tran= sactionManager.java#L316 > [2] > https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f2= 44cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/= streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273 > > Tony Wei =E6=96=BC 2019=E5=B9=B49=E6=9C=8820=E6= =97=A5 =E9=80=B1=E4=BA=94 =E4=B8=8A=E5=8D=8811:56=E5=AF=AB=E9=81=93=EF=BC= =9A > >> Hi, >> >> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I >> opened >> flink's log level to DEBUG for producer. And I found some logs from flin= k >> side >> regarding this error. Below is some log snippet. >> >> It seems that producer client didn't catch this error and retry to find >> new coordinator. >> This caused the transaction state is inconsistent between client side an= d >> server side. >> Would it be possible that the problem is caused >> by FlinkKafkaInternalProducer using >> java reflection to send `addPartitionsToTransactionHandler` request in >> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who >> is familiar >> with both kafka and flink's kafka connector could help me solve this? >> Thanks very much. >> >> The attachment is my code to reproduce this problem. >> The cluster's versions are the same as I mentioned in my first email. >> >> Best, >> Tony Wei >> >> *flink taskmanager:* >> >>> 2019-09-20 02:32:45,927 INFO >>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInterna= lProducer >>> - Flushing new partitions >>> 2019-09-20 02:32:45,927 DEBUG >>> org.apache.kafka.clients.producer.internals.TransactionManager - [Prod= ucer >>> clientId=3Dproducer-29, transactionalId=3Dmap -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional reques= t >>> (type=3DAddPartitionsToTxnRequest, transactionalId=3Dmap -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=3D1008, producerEpo= ch=3D1, >>> partitions=3D[]) >>> >> 2019-09-20 02:32:45,931 DEBUG >>> org.apache.kafka.clients.producer.internals.Sender - [Produc= er >>> clientId=3Dproducer-29, transactionalId=3Dmap -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request >>> (type=3DAddPartitionsToTxnRequest, transactionalId=3Dmap -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=3D1008, producerEpo= ch=3D1, >>> partitions=3D[]) to node *kafka-broker-1:9092* (id: 1 rack: null) >>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient >>> - [Producer clientId=3Dproducer-29, transactionalId= =3Dmap -> >>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v= 0 to >>> send ADD_PARTITIONS_TO_TXN {transactional_id=3Dmap -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=3D1008,producer_epo= ch=3D1,topics=3D[]} >>> with correlation id 12 to node 1 >>> 2019-09-20 02:32:45,937 DEBUG >>> org.apache.kafka.clients.producer.internals.TransactionManager - [Prod= ucer >>> clientId=3Dproducer-29, transactionalId=3Dmap -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions = [] >>> to transaction >> >> >> *kafka-broker-1:* >> >>> [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=3D1] >>> Initialized transactionalId map -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and produc= er >>> epoch 1 on partition __transaction_state-37 >>> (kafka.coordinator.transaction.TransactionCoordinator) >> >> [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=3D1] Returnin= g >>> NOT_COORDINATOR error code to client for map -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request >>> (kafka.coordinator.transaction.TransactionCoordinator) >>> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=3D1] Abortin= g >>> append of COMMIT to transaction log with coordinator and returning >>> NOT_COORDINATOR error to client for map -> Sink: >>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request >>> (kafka.coordinator.transaction.TransactionCoordinator) >> >> >> >> >> Tony Wei =E6=96=BC 2019=E5=B9=B49=E6=9C=8819=E6= =97=A5 =E9=80=B1=E5=9B=9B =E4=B8=8B=E5=8D=886:25=E5=AF=AB=E9=81=93=EF=BC=9A >> >>> Hi Becket, >>> >>> I found that those transactions were tend to be failed >>> with InvalidTxnStateException if >>> they never sent any records but committed after some brokers being >>> restarted. >>> >>> Because the error state transition always failed from EMPTY to COMMIT, = I >>> run a >>> job with only one parallelism with or without output to Kafka. I tried >>> to restart brokers >>> and see what happened on these two situations and found that I couldn't >>> make job failed >>> when job continuously emitted output to Kafka, but it could fail when i= t >>> didn't send any >>> output to Kafka. >>> >>> I'm not familiar with FlinkKafkaProducer's behavior. I tried to use >>> kafka java producer >>> to reproduce the exception, but it worked well. Maybe my observation is >>> not correct, >>> but the experiment result seems like that. Do you have any thoughts on >>> this? >>> >>> Best, >>> Tony Wei >>> >>> Tony Wei =E6=96=BC 2019=E5=B9=B49=E6=9C=8819= =E6=97=A5 =E9=80=B1=E5=9B=9B =E4=B8=8A=E5=8D=8811:08=E5=AF=AB=E9=81=93=EF= =BC=9A >>> >>>> Hi Becket, >>>> >>>> One more thing, I have tried to restart other brokers without active >>>> controller, but >>>> this exception might happen as well. So it should be independent of >>>> the active >>>> controller like you said. >>>> >>>> Best, >>>> Tony Wei >>>> >>>> Tony Wei =E6=96=BC 2019=E5=B9=B49=E6=9C=8818= =E6=97=A5 =E9=80=B1=E4=B8=89 =E4=B8=8B=E5=8D=886:14=E5=AF=AB=E9=81=93=EF=BC= =9A >>>> >>>>> Hi Becket, >>>>> >>>>> I have reproduced this problem in our development environment. Below >>>>> is the log message with debug level. >>>>> Seems that the exception was from broker-3, and I also found other >>>>> error code in broker-2 during the time. >>>>> >>>>> There are others INVALID_TXN_STATE error for other transaction id. I >>>>> just list one of them. Above log messages only >>>>> shows message with >>>>> `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring befo= re >>>>> `2019-09-18 07:14`. >>>>> >>>>> I didn't see other information to find out why producer tried to make >>>>> transaction state from EMPTY to COMMIT, and what >>>>> made NOT_COORDINATOR happened. Do you have any thought about what's >>>>> happening? Thanks. >>>>> >>>>> *Number of Kafka brokers: 3* >>>>> *logging config for kafka:* >>>>> >>>>>> >>>>>> log4j.appender.transactionAppender=3Dorg.apache.log4j.RollingFileApp= ender >>>>>> >>>>>> log4j.appender.transactionAppender.File=3D${kafka.logs.dir}/kafka-tr= ansaction.log >>>>>> >>>>>> log4j.appender.transactionAppender.layout=3Dorg.apache.log4j.Pattern= Layout >>>>>> log4j.appender.transactionAppender.layout.ConversionPattern=3D[%d] %= p >>>>>> %m (%c)%n >>>>>> log4j.appender.transactionAppender.MaxFileSize=3D10MB >>>>>> log4j.appender.transactionAppender.MaxBackupIndex=3D10 >>>>>> log4j.logger.kafka.coordinator.transaction=3DDEBUG, transactionAppen= der >>>>>> log4j.additivity.kafka.coordinator.transaction=3Dtrue >>>>>> >>>>> >>>>> >>>>> *flink-ui* >>>>>> >>>>>> Timestamp: 2019-09-18, 07:13:43 >>>>>> >>>>> >>>>> >>>>> java.lang.RuntimeException: Error while confirming checkpoint >>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:121= 8) >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:5= 11) >>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor= .java:1149) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecuto= r.java:624) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing >>>>>> one of transactions failed, logging first encountered failure >>>>>> at >>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunc= tion.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.n= otifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpoint= Complete(StreamTask.java:684) >>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:121= 3) >>>>>> ... 5 more >>>>>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: >>>>>> The producer attempted a transactional operation in an invalid state >>>>>> >>>>> >>>>> >>>>> *broker-3* >>>>>> >>>>>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3D3] >>>>>> TransactionalId: blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty,= but >>>>>> received transaction marker result to send: COMMIT >>>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3D3] >>>>>> Aborting append of COMMIT to transaction log with coordinator and re= turning >>>>>> INVALID_TXN_STATE error to client for blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction = request >>>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3D3] >>>>>> TransactionalId: blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty,= but >>>>>> received transaction marker result to send: COMMIT >>>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3D3] >>>>>> Aborting append of COMMIT to transaction log with coordinator and re= turning >>>>>> INVALID_TXN_STATE error to client for blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction = request >>>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: >>>>>> Updating blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction sta= te to >>>>>> TxnTransitMetadata(producerId=3D7019, producerEpoch=3D4, txnTimeoutM= s=3D5400000, >>>>>> txnState=3DEmpty, topicPartitions=3DSet(), txnStartTimestamp=3D-1, >>>>>> txnLastUpdateTimestamp=3D1568790826831) with coordinator epoch 4 for >>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-= 7 >>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>>> >>>>> >>>>> *broker-2* >>>>> >>>>>> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: >>>>>> Updating blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction sta= te to >>>>>> TxnTransitMetadata(producerId=3D7019, produc >>>>>> erEpoch=3D0, txnTimeoutMs=3D5400000, txnState=3DEmpty, >>>>>> topicPartitions=3DSet(), txnStartTimestamp=3D-1, >>>>>> txnLastUpdateTimestamp=3D1568789126318) with coordinator epoch 0 for >>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e6 >>>>>> 0de7e4744f3307058f865-7 succeeded >>>>>> (kafka.coordinator.transaction.TransactionStateManager) >>>>>> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: >>>>>> Updating blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction sta= te to >>>>>> TxnTransitMetadata(producerId=3D7019, producerEpoch=3D1, txnTimeoutM= s=3D5400000, >>>>>> txnState=3DEmpty, topicPartitions=3DSet(), txnStartTimestamp=3D-1, >>>>>> txnLastUpdateTimestamp=3D1568789667979) with coordinator epoch 0 for >>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-= 7 >>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>>> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: >>>>>> Updating blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction sta= te to >>>>>> TxnTransitMetadata(producerId=3D7019, producerEpoch=3D2, txnTimeoutM= s=3D5400000, >>>>>> txnState=3DEmpty, topicPartitions=3DSet(), txnStartTimestamp=3D-1, >>>>>> txnLastUpdateTimestamp=3D1568790385417) with coordinator epoch 0 for >>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-= 7 >>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>>> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: >>>>>> Updating blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction sta= te to >>>>>> TxnTransitMetadata(producerId=3D7019, producerEpoch=3D3, txnTimeoutM= s=3D5400000, >>>>>> txnState=3DEmpty, topicPartitions=3DSet(), txnStartTimestamp=3D-1, >>>>>> txnLastUpdateTimestamp=3D1568790702969) with coordinator epoch 0 for >>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-= 7 >>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>>> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=3D2] >>>>>> Returning NOT_COORDINATOR error code to client for blacklist -> Sink= : >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions r= equest >>>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>>> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=3D2] >>>>>> Aborting append of COMMIT to transaction log with coordinator and re= turning >>>>>> NOT_COORDINATOR error to client for blacklist -> Sink: >>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction = request >>>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>>> >>>>> >>>>> Best, >>>>> Tony Wei >>>>> >>>>> >>>>> Becket Qin =E6=96=BC 2019=E5=B9=B49=E6=9C=882= =E6=97=A5 =E9=80=B1=E4=B8=80 =E4=B8=8B=E5=8D=8810:03=E5=AF=AB=E9=81=93=EF= =BC=9A >>>>> >>>>>> Hi Tony, >>>>>> >>>>>> From the symptom it is not quite clear to me what may cause this >>>>>> issue. Supposedly the TransactionCoordinator is independent of the a= ctive >>>>>> controller, so bouncing the active controller should not have specia= l >>>>>> impact on the transactions (at least not every time). If this is sta= bly >>>>>> reproducible, is it possible to turn on debug level logging >>>>>> on kafka.coordinator.transaction.TransactionCoordinator to see what = does >>>>>> the broker say? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Has anyone run into the same problem? I have updated my producer >>>>>>> transaction timeout to 1.5 hours, >>>>>>> but the problem sill happened when I restarted broker with active >>>>>>> controller. It might not due to the >>>>>>> problem that checkpoint duration is too long causing transaction >>>>>>> timeout. I had no more clue to find out >>>>>>> what's wrong about my kafka producer. Could someone help me please? >>>>>>> >>>>>>> Best, >>>>>>> Tony Wei >>>>>>> >>>>>>> Fabian Hueske =E6=96=BC 2019=E5=B9=B48=E6=9C=88= 16=E6=97=A5 =E9=80=B1=E4=BA=94 =E4=B8=8B=E5=8D=884:10=E5=AF=AB=E9=81=93=EF= =BC=9A >>>>>>> >>>>>>>> Hi Tony, >>>>>>>> >>>>>>>> I'm sorry I cannot help you with this issue, but Becket (in CC) >>>>>>>> might have an idea what went wrong here. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei < >>>>>>>> tony19920430@gmail.com>: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Currently, I was trying to update our kafka cluster with larger ` >>>>>>>>> transaction.max.timeout.ms`. The >>>>>>>>> original setting is kafka's default value (i.e. 15 minutes) and I >>>>>>>>> tried to set as 3 hours. >>>>>>>>> >>>>>>>>> When I was doing rolling-restart for my brokers, this exception >>>>>>>>> came to me on the next checkpoint >>>>>>>>> after I restarted the broker with active controller. >>>>>>>>> >>>>>>>>> java.lang.RuntimeException: Error while confirming checkpoint at >>>>>>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) = at >>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.ja= va:511) at >>>>>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExec= utor.java:1149) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExe= cutor.java:624) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) Caused by: >>>>>>>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of t= ransactions >>>>>>>>>> failed, logging first encountered failure at >>>>>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSink= Function.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperat= or.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckp= ointComplete(StreamTask.java:684) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:121= 3) ... 5 >>>>>>>>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateEx= ception: >>>>>>>>>> The producer attempted a transactional operation in an invalid s= tate >>>>>>>>> >>>>>>>>> >>>>>>>>> I have no idea why it happened, and I didn't find any error log >>>>>>>>> from brokers. Does anyone have >>>>>>>>> this exception before? How can I prevent from this exception when >>>>>>>>> I tried to restart kafka cluster? >>>>>>>>> Does this exception mean that I will lost data in some of these >>>>>>>>> transactions? >>>>>>>>> >>>>>>>>> flink cluster version: 1.8.1 >>>>>>>>> kafka cluster version: 1.0.1 >>>>>>>>> flink kafka producer version: universal >>>>>>>>> producer transaction timeout: 15 minutes >>>>>>>>> checkpoint interval: 5 minutes >>>>>>>>> number of concurrent checkpoint: 1 >>>>>>>>> max checkpoint duration before and after the exception occurred: >>>>>>>>> < 2 minutes >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Tony Wei >>>>>>>>> >>>>>>>> --000000000000593d2f0593495449 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Becket,

I have read kafka source cod= e and found that the error won't be propagated to client if the list of=
topic-partition is empty [1], because it bind the error with eac= h topic-partition. If this list is empty,
then that error won'= ;t be packaged into response body. That made the client didn't get the = error
message to find the newer coordinator.

=
Back to this problem, I think the original design of kafka client migh= t not prefer to execute
`enqueueNewPartitions` if there is no add= ed topic-partition. It might be a bug here, and we should
first c= heck if `newPartitionsInTransaction` list is empty before executing `enqueu= eNewPartitions`
function. Am I right?

If it can be = confirmed=C2=A0as a bug, I would like to submit my patch to fix it. Thanks = for your help.

Best,
Tony Wei

Tony Wei <tony1992043= 0@gmail.com> =E6=96=BC 2019=E5=B9=B49=E6=9C=8820=E6=97=A5 =E9=80=B1= =E4=BA=94 =E4=B8=8B=E5=8D=882:57=E5=AF=AB=E9=81=93=EF=BC=9A
Hi,

I found that the source code [1] in kafka showed that it always ch= eck if `newPartitionsInTransaction`
is empty before ca= lling `enqueueRequest(addPartitionsToTransactionHandler())`, that is= not
applied to flink kafka producer code [2].

I wrote a simple producer with the `flushNewPartitions` copied from = flink kafka producer, and
successfully reproduce this exception. = Then, I modified the logic in `enqueueNewPartitions` to check
if = there is any `newPartitionsInTransaction` before make this request. And thi= s would work well even
if I restarted the broker who owned this t= ransaction's coordinator, since the empty transaction won't
make any request to server.

The attachments=C2= =A0are my simple producer code. Please help to verify what I thought is=C2= =A0correct. Thanks.

Best,
Tony Wei
=


Tony Wei <tony19920430@gmail.com> =E6=96=BC 2019=E5=B9=B49=E6=9C= =8820=E6=97=A5 =E9=80=B1=E4=BA=94 =E4=B8=8A=E5=8D=8811:56=E5=AF=AB=E9=81=93= =EF=BC=9A
Hi,

Trying to dig out why `Error.NOT_COORDINA= TOR` happened in broker, I opened=C2=A0
flink's log level to = DEBUG for producer. And I found some logs from flink side
regardi= ng this error. Below is some log snippet.

It seems= that producer client didn't catch this error and retry to find new coo= rdinator.
This caused the transaction state is inconsistent=C2=A0= between client side and server side.
Would it be possible that th= e problem is caused by=C2=A0FlinkKafkaInternalProducer using
java= reflection to send `addPartitionsToTransactionHandler` request in
`FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who = is familiar
with both kafka and flink's kafka connector could= help me solve this? Thanks very much.

The attachm= ent=C2=A0is my code to reproduce=C2=A0this problem.
The cluster&#= 39;s versions are the same as I mentioned in my first email.

=
Best,
Tony Wei

flink taskm= anager:
2019-09-= 20 02:32:45,927 INFO =C2=A0org.apache.flink.streaming.connectors.kafka.inte= rnal.FlinkKafkaInternalProducer =C2=A0- Flushing new partitions
2019-09-= 20 02:32:45,927 DEBUG org.apache.kafka.clients.producer.internals.Transacti= onManager =C2=A0- [Producer clientId=3Dproducer-29, transactionalId=3Dmap -= > Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional= request (type=3DAddPartitionsToTxnRequest, transactionalId=3Dmap -> Sin= k: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=3D1008, producerEpoc= h=3D1, partitions=3D[])
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.produ= cer.internals.Sender =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0- [Producer c= lientId=3Dproducer-29, transactionalId=3Dmap -> Sink: sink-2e588ce1c86a9= d46e2e85186773ce4fd-3] Sending transactional request (type=3DAddPartitionsT= oTxnRequest, transactionalId=3Dmap -> Sink: sink-2e588ce1c86a9d46e2e8518= 6773ce4fd-3, producerId=3D1008, producerEpoch=3D1, partitions=3D[]) to node= kafka-broker-1:9092 (id: 1 rack: null)
2019-09-20 02:32:45,931 D= EBUG org.apache.kafka.clients.NetworkClient =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0- [Producer clientId=3D= producer-29, transactionalId=3Dmap -> Sink: sink-2e588ce1c86a9d46e2e8518= 6773ce4fd-3] Using older server API v0 to send ADD_PARTITIONS_TO_TXN {trans= actional_id=3Dmap -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3,produ= cer_id=3D1008,producer_epoch=3D1,topics=3D[]} with correlation id 12 to nod= e 1
2019-09-20 02:32:45,937 DEBUG org.apache.kafka.clients.producer.inte= rnals.TransactionManager =C2=A0- [Producer clientId=3Dproducer-29, transact= ionalId=3Dmap -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successf= ully added partitions [] to transaction

= kafka-broker-1:
= =C2=A0[2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=3D1] Initia= lized transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd= -3 with producerId 1008 and producer epoch 1 on partition __transaction_sta= te-37 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:45,962] D= EBUG [TransactionCoordinator id=3D1] Returning NOT_COORDINATOR error code t= o client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's = AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator= )
[2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=3D1] Aborti= ng append of COMMIT to transaction log with coordinator and returning NOT_C= OORDINATOR error to client for map -> Sink: sink-2e588ce1c86a9d46e2e8518= 6773ce4fd-3's EndTransaction request (kafka.coordinator.transaction.Tra= nsactionCoordinator)



Tony Wei <= ;tony19920430@g= mail.com> =E6=96=BC 2019=E5=B9=B49=E6=9C=8819=E6=97=A5 =E9=80=B1=E5= =9B=9B =E4=B8=8B=E5=8D=886:25=E5=AF=AB=E9=81=93=EF=BC=9A
Hi Becket,
I found that those transactions were tend to be failed with=C2= =A0InvalidTxnStateException if
they never sent any records but co= mmitted after some brokers being restarted.

Becaus= e the error state transition always failed from EMPTY to COMMIT, I run a
job with only one parallelism with or without output to Kafka. I tr= ied to restart brokers
and see what happened=C2=A0on these two si= tuations and found that I couldn't make job failed
when job c= ontinuously emitted output to Kafka, but it could fail when it didn't s= end any
output to Kafka.

I'm not fam= iliar with FlinkKafkaProducer's behavior. I tried to use kafka java pro= ducer
to reproduce the exception, but it worked well. Maybe m= y observation is not correct,
but the experiment result seems lik= e that. Do you have any thoughts on this?

Best,
Tony Wei

Tony Wei <tony19920430@gmail.com> =E6=96=BC 2019=E5=B9=B49= =E6=9C=8819=E6=97=A5 =E9=80=B1=E5=9B=9B =E4=B8=8A=E5=8D=8811:08=E5=AF=AB=E9= =81=93=EF=BC=9A
=
Hi Becket,

One more thing, I have tried= to restart other brokers without active controller, but
this exc= eption might happen as well. So it should be independent=C2=A0 of the activ= e
controller like you said.

Best,
<= div>Tony Wei

Tony Wei <tony19920430@gmail.com> =E6=96=BC 2019=E5=B9=B49=E6= =9C=8818=E6=97=A5 =E9=80=B1=E4=B8=89 =E4=B8=8B=E5=8D=886:14=E5=AF=AB=E9=81= =93=EF=BC=9A
Hi Becket,

I have reproduced this problem = in our development environment. Below is the log message with debug level.<= div>
Seems that the exception was from broker-3, and I also found other= error code in broker-2 during the time.
=
There are others=C2=A0INVALID_TXN_STATE error for other tran= saction id. I just list one of them. Above log messages only
show= s message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` s= ubstring before `2019-09-18 07:14`.

I didn't s= ee other information to find out why producer tried to make transaction sta= te from EMPTY to COMMIT, and what
made=C2=A0NOT_COORDINATOR happe= ned. Do you have any thought about what's happening? Thanks.

<= b>Number of Kafka brokers: 3
logging config for kafka:=
log4j.appender.tran= sactionAppender=3Dorg.apache.log4j.RollingFileAppender
log4j.appender.tr= ansactionAppender.File=3D${kafka.logs.dir}/kafka-transaction.log
log4j.a= ppender.transactionAppender.layout=3Dorg.apache.log4j.PatternLayout
log4= j.appender.transactionAppender.layout.ConversionPattern=3D[%d] %p %m (%c)%n=
log4j.appender.transactionAppender.MaxFileSize=3D10MB
log4j.appender= .transactionAppender.MaxBackupIndex=3D10
log4j.logger.kafka.coordinator.= transaction=3DDEBUG, transactionAppender
log4j.additivity.kafka.coordina= tor.transaction=3Dtrue

flink-ui
Timestamp: 2019-09-18, 07:13:43
=C2=A0
java.lang.Runtim= eException: Error while confirming checkpoint
=C2=A0 =C2=A0 at org.apach= e.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
=C2=A0 =C2=A0 at = java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
= =C2=A0 =C2=A0 at java.util.concurrent.FutureTask.run(FutureTask.java:266)=C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor.runWorker(Thread= PoolExecutor.java:1149)
=C2=A0 =C2=A0 at java.util.concurrent.ThreadPool= Executor$Worker.run(ThreadPoolExecutor.java:624)
=C2=A0 =C2=A0 at java.l= ang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRu= ntimeException: Committing one of transactions failed, logging first encoun= tered failure
=C2=A0 =C2=A0 at org.apache.flink.streaming.api.functions.= sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSink= Function.java:296)
=C2=A0 =C2=A0 at org.apache.flink.streaming.api.opera= tors.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOp= erator.java:130)
=C2=A0 =C2=A0 at org.apache.flink.streaming.runtime.tas= ks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
=C2=A0 =C2= =A0 at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
= =C2=A0 =C2=A0 ... 5 more
Caused by: org.apache.kafka.common.errors.Inval= idTxnStateException: The producer attempted a transactional operation in an= invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [Tr= ansactionCoordinator id=3D3] TransactionalId: blacklist -> Sink: kafka-s= ink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but recei= ved transaction marker result to send: COMMIT (kafka.coordinator.transactio= n.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCo= ordinator id=3D3] Aborting append of COMMIT to transaction log with coordin= ator and returning INVALID_TXN_STATE error to client for blacklist -> Si= nk: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction= request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09= -18 07:13:45,896] DEBUG [TransactionCoordinator id=3D3] TransactionalId: bl= acklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7'= s state is Empty, but received transaction marker result to send: COMMIT (k= afka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:4= 5,896] DEBUG [TransactionCoordinator id=3D3] Aborting append of COMMIT to t= ransaction log with coordinator and returning INVALID_TXN_STATE error to cl= ient for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058= f865-7's EndTransaction request (kafka.coordinator.transaction.Transact= ionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manag= er 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f= 3307058f865-7's transaction state to TxnTransitMetadata(producerId=3D70= 19, producerEpoch=3D4, txnTimeoutMs=3D5400000, txnState=3DEmpty, topicParti= tions=3DSet(), txnStartTimestamp=3D-1, txnLastUpdateTimestamp=3D15687908268= 31) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba8= 62242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.Tra= nsactionStateManager)

broker-2
[2019-09-18 06:= 45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Si= nk: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction st= ate to TxnTransitMetadata(producerId=3D7019, produc
erEpoch=3D0, txnTime= outMs=3D5400000, txnState=3DEmpty, topicPartitions=3DSet(), txnStartTimesta= mp=3D-1, txnLastUpdateTimestamp=3D1568789126318) with coordinator epoch 0 f= or blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f8= 65-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[= 2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blac= klist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's = transaction state to TxnTransitMetadata(producerId=3D7019, producerEpoch=3D= 1, txnTimeoutMs=3D5400000, txnState=3DEmpty, topicPartitions=3DSet(), txnSt= artTimestamp=3D-1, txnLastUpdateTimestamp=3D1568789667979) with coordinator= epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f330= 7058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager= )
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updatin= g blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7&= #39;s transaction state to TxnTransitMetadata(producerId=3D7019, producerEp= och=3D2, txnTimeoutMs=3D5400000, txnState=3DEmpty, topicPartitions=3DSet(),= txnStartTimestamp=3D-1, txnLastUpdateTimestamp=3D1568790385417) with coord= inator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e47= 44f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateM= anager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: U= pdating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f= 865-7's transaction state to TxnTransitMetadata(producerId=3D7019, prod= ucerEpoch=3D3, txnTimeoutMs=3D5400000, txnState=3DEmpty, topicPartitions=3D= Set(), txnStartTimestamp=3D-1, txnLastUpdateTimestamp=3D1568790702969) with= coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60= de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.Transaction= StateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id= =3D2] Returning NOT_COORDINATOR error code to client for blacklist -> Si= nk: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions = request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-= 18 07:13:43,633] DEBUG [TransactionCoordinator id=3D2] Aborting append of C= OMMIT to transaction log with coordinator and returning NOT_COORDINATOR err= or to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744= f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.= TransactionCoordinator)

Best,
Tony Wei


Becket Qin <becket.qin@gmail.com> =E6=96=BC 2019=E5= =B9=B49=E6=9C=882=E6=97=A5 =E9=80=B1=E4=B8=80 =E4=B8=8B=E5=8D=8810:03=E5=AF= =AB=E9=81=93=EF=BC=9A
Hi Tony,

From the symptom it is n= ot quite clear to me what may cause this issue. Supposedly the TransactionC= oordinator is independent of the active controller, so bouncing the active = controller should not have special impact on the transactions (at least not= every time). If this is stably reproducible, is it possible to turn on deb= ug level logging on=C2=A0kafka.coordinator.transaction.TransactionCoordinat= or to see what does the broker say?

Thanks,
<= div>
Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 = PM Tony Wei <tony19920430@gmail.com> wrote:
Hi,

Has anyone r= un into the same problem? I have updated my producer transaction timeout to= 1.5 hours,
but the problem sill happened when I restarted broker= with active controller. It might not due to the
problem that che= ckpoint duration is too long causing transaction timeout. I had no more clu= e to find out
what's wrong about my kafka producer. Could som= eone help me please?

Best,
Tony Wei

Fabian Hueske <f= hueske@gmail.com> =E6=96=BC 2019=E5=B9=B48=E6=9C=8816=E6=97=A5 =E9= =80=B1=E4=BA=94 =E4=B8=8B=E5=8D=884:10=E5=AF=AB=E9=81=93=EF=BC=9A
=
Hi = Tony,

I'm sorry I cannot help you with this is= sue, but Becket (in CC) might have an idea what went wrong here.
=
Best, Fabian

<= div dir=3D"ltr" class=3D"gmail_attr">Am Mi., 14. Aug. 2019 um 07:00=C2=A0Uh= r schrieb Tony Wei <tony19920430@gmail.com>:
Hi,

=
Currently, I was trying to update our kafka cluster with larger = `transactio= n.max.timeout.ms`. The
original setting is kafka's defaul= t value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came= to me on the next checkpoint=C2=A0
after I restarted the broker = with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.ja= va:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.j= ava:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of t= ransactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctio= n.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.noti= fyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointCom= plete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The pro= ducer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't = find any error log from brokers. Does anyone have
this exception = before? How can I prevent from this exception when I tried to restart kafka= cluster?
Does this exception mean that I will lost data in some = of these transactions?

flink cluster version: 1.8.= 1
kafka cluster version: 1.0.1
flink kafka producer ver= sion: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint:= 1
max checkpoint duration before and after the exception occurre= d:=C2=A0 < 2 minutes

Best,
Tony Wei
--000000000000593d2f0593495449--