kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apurva Mehta (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6119) Silent Data Loss in Kafka011 Transactional Producer
Date Wed, 25 Oct 2017 21:27:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219548#comment-16219548

Apurva Mehta commented on KAFKA-6119:

Thanks for the report.

Can you share the actual kafka data logs for the partitions in question? Could you also share
the TRACE level logging for the producer? 

The producer epoch is bumped on a transaction timeout, so any future messages from the producer
with the old epoch should result in a {{ProducerFencedException}}.  So in your program, the
second send should result in a {{ProducerFencedException}} and no further operations should
be allowed on the producer. 


> Silent Data Loss in Kafka011 Transactional Producer
> ---------------------------------------------------
>                 Key: KAFKA-6119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6119
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions:,
>         Environment: openjdk version "1.8.0_144"
> OpenJDK Runtime Environment (Zulu (build 1.8.0_144-b01)
> OpenJDK 64-Bit Server VM (Zulu (build 25.144-b01, mixed mode)
>            Reporter: Gary Y.
>            Priority: Blocker
>              Labels: reliability
> Kafka can lose data published by a transactional {{KafkaProducer}} under some circumstances,
i.e., data that should be committed atomically may not be fully visible from a consumer with
{{read_committed}} isolation level.
> *Steps to reproduce:*
> # Set {{transaction.timeout.ms}} to a low value such as {{100}}
> # Publish two messages in one transaction to different partitions of a topic with a sufficiently
long time in-between the messages (e.g., 70 s).
> # Only the second message is visible with {{read_committed}} isolation level.
> See 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
for a full example. Detailed instructions can be found in the {{README.md}}: https://github.com/GJL/kafka011-transactional-producer-bug-demo
> *Why is this possible?*
> Because the transaction timeout is set to a low value, the transaction will be rolled
back quickly after the first message is sent. Indeed, in the broker the following logs could
be found:
> {code}
> [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId
test-producer-1508964897483 with producerId 5 and producer epoch 0 on partition __transaction_state-10
> [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing
transaction of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
> {code}
> After rollback, the second message is sent to a different partition than the first message.

> Upon, transaction commit, {{org.apache.kafka.clients.producer.internals.TransactionManager}}
may enqueue the request {{addPartitionsToTransactionHandler}}:
> {code}
> private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult)
>         if (!newPartitionsInTransaction.isEmpty())
>             enqueueRequest(addPartitionsToTransactionHandler());
>         EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
>                 producerIdAndEpoch.epoch, transactionResult);
>         EndTxnHandler handler = new EndTxnHandler(builder);
>         enqueueRequest(handler);
>         return handler.result;
>     }
> {code}
> As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} is non-empty.
I suspect because the second message goes to a different partition, this condition is satisfied.
> In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} may eventually
call {{TransactionMetadata#prepareAddPartitions}}:
> {code}
>  def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp:
Long): TxnTransitMetadata = {
>     val newTxnStartTimestamp = state match {
>       case Empty | CompleteAbort | CompleteCommit => updateTimestamp
>       case _ => txnStartTimestamp
>     }
>     prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions
++ addedTopicPartitions).toSet,
>       newTxnStartTimestamp, updateTimestamp)
>   }
> {code}
> Note that the method's first argument {{newState}} of is always *Ongoing* here. I suspect
that this puts the transaction, which should be aborted, to _Ongoing_ again.

This message was sent by Atlassian JIRA

View raw message