kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Y. (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6119) Silent Data Loss in Kafka011 Transactional Producer
Date Wed, 25 Oct 2017 21:14:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Gary Y. updated KAFKA-6119:
---------------------------
    Description: 
Kafka can lose data published by a transactional {{KafkaProducer}} under some circumstances,
i.e., data that should be committed atomically may not be fully visible from a consumer with
{{read_committed}} isolation level.
 
*Steps to reproduce:*
# Set {{transaction.timeout.ms}} to a low value such as {{100}}
# Publish two messages in one transaction to different partitions of a topic with a sufficiently
long time in-between the messages (e.g., 70 s).
# Only the second message is visible with {{read_committed}} isolation level.

See 
https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
for a full example. Detailed instructions can be found in the {{README.md}}: https://github.com/GJL/kafka011-transactional-producer-bug-demo

*Why is this possible?*
Because the transaction timeout is set to a low value, the transaction will be rolled back
quickly after the first message is sent. Indeed, in the broker the following logs could be
found:
{code}
[2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId test-producer-1508964897483
with producerId 5 and producer epoch 0 on partition __transaction_state-10 (kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing transaction
of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
{code}

After rollback, the second message is sent to a different partition than the first message.

Upon, transaction commit, {{org.apache.kafka.clients.producer.internals.TransactionManager}}
may enqueue the request {{addPartitionsToTransactionHandler}}:
{code}
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult)
{
        if (!newPartitionsInTransaction.isEmpty())
            enqueueRequest(addPartitionsToTransactionHandler());
        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                producerIdAndEpoch.epoch, transactionResult);
        EndTxnHandler handler = new EndTxnHandler(builder);
        enqueueRequest(handler);
        return handler.result;
    }
{code}

As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} is non-empty.
I suspect because the second message goes to a different partition, this condition is satisfied.

In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} eventually may call
{{prepareAddPartitions}}:
{code}
 def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp:
Long): TxnTransitMetadata = {
    val newTxnStartTimestamp = state match {
      case Empty | CompleteAbort | CompleteCommit => updateTimestamp
      case _ => txnStartTimestamp
    }

    prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions
++ addedTopicPartitions).toSet,
      newTxnStartTimestamp, updateTimestamp)
  }
{code}

Note that the method's first argument {{newState}} of is always *Ongoing* here. I suspect
that this puts the transaction, which should be aborted, to _Ongoing_ again.



  was:
Kafka can lose data published by a transactional {{KafkaProducer}} under some circumstances,
i.e., data that should be committed atomically may not be fully visible from a consumer with
{{read_committed}} isolation level.
 
*Steps to reproduce:*
# Set {{transaction.timeout.ms}} to a low value such as {{100}}
# Publish two messages in one transaction to different partitions of a topic with a sufficiently
long time in-between the messages (e.g., 70 s).
# Only the second message is visible with {{read_committed}} isolation level.

See 
https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
for a full example. Detailed instructions can be found in the {{README.md}}: https://github.com/GJL/kafka011-transactional-producer-bug-demo

*Why is this possible?*
Because the transaction timeout is set to a low value, the transaction will be rolled back
quickly after the first message is sent. Indeed, in the broker the following logs could be
found:
{code}
[2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId test-producer-1508964897483
with producerId 5 and producer epoch 0 on partition __transaction_state-10 (kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing transaction
of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
{code}

After rollback the second message is sent to a different partition than the first message.

Upon, transaction commit, {{org.apache.kafka.clients.producer.internals.TransactionManager}}
may enqueue the request {{addPartitionsToTransactionHandler}}:
{code}
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult)
{
        if (!newPartitionsInTransaction.isEmpty())
            enqueueRequest(addPartitionsToTransactionHandler());
        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                producerIdAndEpoch.epoch, transactionResult);
        EndTxnHandler handler = new EndTxnHandler(builder);
        enqueueRequest(handler);
        return handler.result;
    }
{code}

As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} is non-empty.
I suspect because the second message goes to a different partition, this condition is satisfied.

In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} eventually may call
{{prepareAddPartitions}}:
{code}
 def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp:
Long): TxnTransitMetadata = {
    val newTxnStartTimestamp = state match {
      case Empty | CompleteAbort | CompleteCommit => updateTimestamp
      case _ => txnStartTimestamp
    }

    prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions
++ addedTopicPartitions).toSet,
      newTxnStartTimestamp, updateTimestamp)
  }
{code}

Note that the method's first argument {{newState}} of is always *Ongoing* here. I suspect
that this puts the transaction, which should be aborted, to _Ongoing_ again.




> Silent Data Loss in Kafka011 Transactional Producer
> ---------------------------------------------------
>
>                 Key: KAFKA-6119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6119
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.11.0.0, 0.11.0.1
>         Environment: openjdk version "1.8.0_144"
> OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01)
> OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode)
>            Reporter: Gary Y.
>            Priority: Blocker
>
> Kafka can lose data published by a transactional {{KafkaProducer}} under some circumstances,
i.e., data that should be committed atomically may not be fully visible from a consumer with
{{read_committed}} isolation level.
>  
> *Steps to reproduce:*
> # Set {{transaction.timeout.ms}} to a low value such as {{100}}
> # Publish two messages in one transaction to different partitions of a topic with a sufficiently
long time in-between the messages (e.g., 70 s).
> # Only the second message is visible with {{read_committed}} isolation level.
> See 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
for a full example. Detailed instructions can be found in the {{README.md}}: https://github.com/GJL/kafka011-transactional-producer-bug-demo
> *Why is this possible?*
> Because the transaction timeout is set to a low value, the transaction will be rolled
back quickly after the first message is sent. Indeed, in the broker the following logs could
be found:
> {code}
> [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId
test-producer-1508964897483 with producerId 5 and producer epoch 0 on partition __transaction_state-10
(kafka.coordinator.transaction.TransactionCoordinator)
> [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing
transaction of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
> {code}
> After rollback, the second message is sent to a different partition than the first message.

> Upon, transaction commit, {{org.apache.kafka.clients.producer.internals.TransactionManager}}
may enqueue the request {{addPartitionsToTransactionHandler}}:
> {code}
> private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult)
{
>         if (!newPartitionsInTransaction.isEmpty())
>             enqueueRequest(addPartitionsToTransactionHandler());
>         EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
>                 producerIdAndEpoch.epoch, transactionResult);
>         EndTxnHandler handler = new EndTxnHandler(builder);
>         enqueueRequest(handler);
>         return handler.result;
>     }
> {code}
> As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} is non-empty.
I suspect because the second message goes to a different partition, this condition is satisfied.
> In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} eventually
may call {{prepareAddPartitions}}:
> {code}
>  def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp:
Long): TxnTransitMetadata = {
>     val newTxnStartTimestamp = state match {
>       case Empty | CompleteAbort | CompleteCommit => updateTimestamp
>       case _ => txnStartTimestamp
>     }
>     prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions
++ addedTopicPartitions).toSet,
>       newTxnStartTimestamp, updateTimestamp)
>   }
> {code}
> Note that the method's first argument {{newState}} of is always *Ongoing* here. I suspect
that this puts the transaction, which should be aborted, to _Ongoing_ again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message