kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
Date Wed, 13 Jun 2018 01:28:43 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 1024ae6  KAFKA-6906: Fixed to commit transactions if data is produced via wall clock
punctuation (#5105)
1024ae6 is described below

commit 1024ae6e5747a55ad83c0f3b6c3518ee881dce0f
Author: Jagadesh Adireddi <adireddijagadesh@gmail.com>
AuthorDate: Tue Jun 12 03:10:03 2018 +0530

    KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation
(#5105)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <guozhang@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 14 +++---
 .../processor/internals/StreamTaskTest.java        | 51 +++++++++++++++++++++-
 .../processor/internals/StreamThreadTest.java      |  4 +-
 3 files changed, 58 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index df17f29..cc6d7a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -353,19 +353,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
                 if (eosEnabled) {
                     producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                    producer.commitTransaction();
-                    transactionInFlight = false;
-                    if (startNewTransaction) {
-                        producer.beginTransaction();
-                        transactionInFlight = true;
-                    }
                 } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
                 }
                 commitOffsetNeeded = false;
-            } else if (eosEnabled && !startNewTransaction && transactionInFlight)
{ // need to make sure to commit txn for suspend case
+            }
+
+            if (eosEnabled) {
                 producer.commitTransaction();
                 transactionInFlight = false;
+                if (startNewTransaction) {
+                    producer.beginTransaction();
+                    transactionInFlight = true;
+                }
             }
         } catch (final CommitFailedException | ProducerFencedException fatal) {
             throw new TaskMigratedException(this, fatal);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 26b4e9a..2b79f78 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -173,7 +173,14 @@ public class StreamTaskTest {
     public void cleanup() throws IOException {
         try {
             if (task != null) {
-                task.close(true, false);
+                try {
+                    task.close(true, false);
+                } catch (final IllegalStateException canHappen) {
+                    if (!"There is no open transaction.".equals(canHappen.getMessage()))
{
+                        throw canHappen;
+                    }
+                    // swallow
+                }
             }
         } finally {
             Utils.delete(baseDir);
@@ -820,6 +827,7 @@ public class StreamTaskTest {
                 };
             }
         };
+        streamTask.initializeTopology();
 
         time.sleep(testConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
 
@@ -1070,6 +1078,7 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
             changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.close(true, false);
         task = null;
@@ -1230,6 +1239,45 @@ public class StreamTaskTest {
         assertFalse(task.initializeStateStores());
     }
 
+    @Test
+    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        try {
+            task.close(true, false);
+            fail("should have throw IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            // pass
+        }
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
+    public void shouldAlwaysCommitIfEosEnabled() {
+        final MockProducer producer = new MockProducer();
+
+        final RecordCollectorImpl recordCollector =  new RecordCollectorImpl(producer, "StreamTask",
+                new LogContext("StreamTaskTest "));
+
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeStateStores();
+        task.initializeTopology();
+        task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator()
{
+            @Override
+            public void punctuate(final long timestamp) {
+                recordCollector.send("result-topic1", 3, 5, 0, time.milliseconds(),
+                        new IntegerSerializer(),  new IntegerSerializer());
+            }
+        });
+        task.commit();
+        assertEquals(1, producer.history().size());
+    }
+
     @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {
@@ -1259,5 +1307,4 @@ public class StreamTaskTest {
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[],
byte[]>... recs) {
         return Arrays.asList(recs);
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 7d04040..7e0be76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -851,7 +851,7 @@ public class StreamThreadTest {
             new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    return producer.commitCount() == 1;
+                    return producer.commitCount() == 2;
                 }
             },
             "StreamsThread did not commit transaction.");
@@ -872,7 +872,7 @@ public class StreamThreadTest {
             },
             "StreamsThread did not remove fenced zombie task.");
 
-        assertThat(producer.commitCount(), equalTo(1L));
+        assertThat(producer.commitCount(), equalTo(2L));
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message