kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
Date Thu, 05 Apr 2018 22:41:43 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new fd35336  KAFKA-6747 Check whether there is in-flight transaction before aborting
transaction (#4826)
fd35336 is described below

commit fd35336d28660e31255070e26f55a5477ced2a83
Author: tedyu <yuzhihong@gmail.com>
AuthorDate: Thu Apr 5 15:29:04 2018 -0700

    KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
    
    As Frederic reported on mailing list under the subject "kafka-streams Invalid transition
attempted from state READY to state ABORTING_TRANSACTION", producer#abortTransaction should
only be called when transactionInFlight is true.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/processor/internals/StreamTask.java  | 4 ++--
 .../apache/kafka/streams/processor/internals/StreamTaskTest.java  | 8 ++++++++
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d04be04..a033043 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -358,8 +358,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                     producer.commitTransaction();
                     transactionInFlight = false;
                     if (startNewTransaction) {
-                        transactionInFlight = true;
                         producer.beginTransaction();
+                        transactionInFlight = true;
                     }
                 } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
@@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             if (eosEnabled) {
                 if (!clean) {
                     try {
-                        if (!isZombie) {
+                        if (!isZombie && transactionInFlight) {
                             producer.abortTransaction();
                         }
                         transactionInFlight = false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a305829..d6a5276 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -783,6 +783,14 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
+        task = createStatelessTask(true);
+
+        assertTrue(!producer.transactionInFlight());
+        task.close(false, false);
+    }
+
+    @Test
     public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
         task = createStatelessTask(false);
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message