Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1F41F200D27 for ; Wed, 25 Oct 2017 23:16:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1DB54160BDA; Wed, 25 Oct 2017 21:16:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62DD01609CE for ; Wed, 25 Oct 2017 23:16:12 +0200 (CEST) Received: (qmail 82971 invoked by uid 500); 25 Oct 2017 21:16:11 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 82958 invoked by uid 99); 25 Oct 2017 21:16:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 21:16:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id BEC5FC11BE for ; Wed, 25 Oct 2017 21:16:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id UAjUpqzMqJwN for ; Wed, 25 Oct 2017 21:16:09 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 00AB05FCD3 for ; Wed, 25 Oct 2017 21:16:09 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id DEBBBE264D for ; Wed, 25 Oct 2017 21:16:06 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 9C6D121311 for ; Wed, 25 Oct 2017 21:16:03 +0000 (UTC) Date: Wed, 25 Oct 2017 21:16:03 +0000 (UTC) From: "Ismael Juma (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (KAFKA-6119) Silent Data Loss in Kafka011 Transactional Producer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 25 Oct 2017 21:16:13 -0000 [ https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6119: ------------------------------- Labels: reliability (was: ) > Silent Data Loss in Kafka011 Transactional Producer > --------------------------------------------------- > > Key: KAFKA-6119 > URL: https://issues.apache.org/jira/browse/KAFKA-6119 > Project: Kafka > Issue Type: Bug > Components: core, producer > Affects Versions: 0.11.0.0, 0.11.0.1 > Environment: openjdk version "1.8.0_144" > OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01) > OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode) > Reporter: Gary Y. > Priority: Blocker > Labels: reliability > > Kafka can lose data published by a transactional {{KafkaProducer}} under some circumstances, i.e., data that should be committed atomically may not be fully visible from a consumer with {{read_committed}} isolation level. > > *Steps to reproduce:* > # Set {{transaction.timeout.ms}} to a low value such as {{100}} > # Publish two messages in one transaction to different partitions of a topic with a sufficiently long time in-between the messages (e.g., 70 s). > # Only the second message is visible with {{read_committed}} isolation level. > See > https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java for a full example. Detailed instructions can be found in the {{README.md}}: https://github.com/GJL/kafka011-transactional-producer-bug-demo > *Why is this possible?* > Because the transaction timeout is set to a low value, the transaction will be rolled back quickly after the first message is sent. Indeed, in the broker the following logs could be found: > {code} > [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId test-producer-1508964897483 with producerId 5 and producer epoch 0 on partition __transaction_state-10 (kafka.coordinator.transaction.TransactionCoordinator) > [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing transaction of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator) > {code} > After rollback, the second message is sent to a different partition than the first message. > Upon, transaction commit, {{org.apache.kafka.clients.producer.internals.TransactionManager}} may enqueue the request {{addPartitionsToTransactionHandler}}: > {code} > private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) { > if (!newPartitionsInTransaction.isEmpty()) > enqueueRequest(addPartitionsToTransactionHandler()); > EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, > producerIdAndEpoch.epoch, transactionResult); > EndTxnHandler handler = new EndTxnHandler(builder); > enqueueRequest(handler); > return handler.result; > } > {code} > As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} is non-empty. I suspect because the second message goes to a different partition, this condition is satisfied. > In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} eventually may call {{prepareAddPartitions}}: > {code} > def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = { > val newTxnStartTimestamp = state match { > case Empty | CompleteAbort | CompleteCommit => updateTimestamp > case _ => txnStartTimestamp > } > prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions ++ addedTopicPartitions).toSet, > newTxnStartTimestamp, updateTimestamp) > } > {code} > Note that the method's first argument {{newState}} of is always *Ongoing* here. I suspect that this puts the transaction, which should be aborted, to _Ongoing_ again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)