kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "William Parker (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8674) Records that exceed maximum size on the Java Producer don't cause transaction failure
Date Tue, 16 Jul 2019 16:16:00 GMT
William Parker created KAFKA-8674:
-------------------------------------

             Summary: Records that exceed maximum size on the Java Producer don't cause transaction
failure
                 Key: KAFKA-8674
                 URL: https://issues.apache.org/jira/browse/KAFKA-8674
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 2.1.0
            Reporter: William Parker


When using transactions, the [documentation for the Java producer's send method|https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]
states that
{code:java}
When used as part of a transaction, it is not necessary to define a callback or check the
result of the future in order to detect errors from send. If any of the send calls failed
with an irrecoverable error, the final commitTransaction() call will fail and throw the exception
from the last failed send. When this happens, your application should call abortTransaction()
to reset the state and continue to send data.
{code}
However, when the message size is too large, this is not the behavior we have observed; rather,
the commitTransaction call succeeds and the message is not sent to the broker although other
messages succeed. A KafkaProducer$FutureFailure is returned from the send method, and when
the .get method on this future is called a RecordTooLargeException is thrown, but the according
to the documentation this should not be needed.

I believe this occurs because the doSend method has an [ensureValidRecordSize call|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L893]
which throws an exception early in its body, before later calls involving the TransactionManager
occur; in essence the fact that this send was attempted is hidden from the TransactionManager.
The logic that actually sends the record(s) appears to be [here|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L899],
pasted below, in the following lines:
{code:java}
if (transactionManager != null && transactionManager.isTransactional())
 transactionManager.maybeAddPartitionToTransaction(tp);

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new
batch", record.topic(), partition);
 this.sender.wakeup();
}
{code}
Specifically, as I understand this process, the Sender [retrieves data from the RecordAccumulator|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L342].
The [TransactionManager|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java]
maintains state that indicates whether the transaction has failed, and when the commitTransaction
call is made this state is checked. This state appears to be updated by the Sender, for example
in [failBatch|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L686].

However, the crucial point is that when ensureValidRecordSize throws an exception, this block
of interacting code that causes the transaction to fail (between the RecordAccumulator, the
TransactionManager, and the Sender) is never reached.

It appears to me that Kafka is not behaving as specified by its docs. Are there thoughts on
whether the docs should be changed to reflect this behavior, or if the ensureValidRecordSize
call should be changed to cause the TransactionManager to fail? Alternatively am I perhaps
missing something? Is this something you'd be interested in taking a patch for?

I found this bug on a cluster running Kafka 2.1.0, but the underlying bug seems to exist in
the doSend method in [version 2.3.0|https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908]
and in the [trunk branch at time of writing|https://github.com/apache/kafka/blob/d227f940489434c2f23491340d4399d98fd48d2d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908]. 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Mime
View raw message