kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6748) Scheduler cannot be cancelled from Punctuator
Date Thu, 05 Apr 2018 18:18:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16427387#comment-16427387
] 

ASF GitHub Bot commented on KAFKA-6748:
---------------------------------------

guozhangwang closed pull request #4827: KAFKA-6748: double check before scheduling a new task
after the punctuate call
URL: https://github.com/apache/kafka/pull/4827
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index 80eda6c9dd8..354c602cb3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -52,7 +52,10 @@ boolean mayPunctuate(final long timestamp, final PunctuationType type,
final Pro
 
                 if (!sched.isCancelled()) {
                     processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator());
-                    pq.add(sched.next(timestamp));
+                    // sched can be cancelled from within the punctuator
+                    if (!sched.isCancelled()) {
+                        pq.add(sched.next(timestamp));
+                    }
                     punctuated = true;
                 }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 09c7a0a3183..e799688c428 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -126,6 +127,42 @@ public void punctuate(ProcessorNode node, long time, PunctuationType
type, Punct
         assertEquals(4, processor.punctuatedAt.size());
     }
 
+    @Test
+    public void testPunctuationIntervalCancelFromPunctuator() {
+        final TestProcessor processor = new TestProcessor();
+        final ProcessorNode<String, String> node = new ProcessorNode<>("test",
processor, null);
+        final PunctuationQueue queue = new PunctuationQueue();
+        final Punctuator punctuator = new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                node.processor().punctuate(timestamp);
+            }
+        };
+
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
+        final long now = sched.timestamp - 100L;
+
+        final Cancellable cancellable = queue.schedule(sched);
+
+        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+            @Override
+            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator
punctuator) {
+                punctuator.punctuate(time);
+                // simulate scheduler cancelled from within punctuator
+                cancellable.cancel();
+            }
+        };
+
+        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+    }
+
     private static class TestProcessor extends AbstractProcessor<String, String> {
 
         public final ArrayList<Long> punctuatedAt = new ArrayList<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Scheduler cannot be cancelled from Punctuator
> ---------------------------------------------
>
>                 Key: KAFKA-6748
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6748
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Frederic Arno
>            Assignee: Frederic Arno
>            Priority: Major
>             Fix For: 1.0.2, 1.1.1
>
>
> A Scheduler cannot be cancelled from within the scheduled punctuator, I will post a test
case to illustrate the problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message