kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [kafka] tombentley commented on a change in pull request #10605: KAFKA-12726 prevent a stuck Task.stop() from blocking subsequent Task.stops()s
Date Thu, 29 Apr 2021 11:01:00 GMT

tombentley commented on a change in pull request #10605:
URL: https://github.com/apache/kafka/pull/10605#discussion_r622876998



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        task.stop();

Review comment:
       If stop throws we won't count down the latch. No harm will result except there will
be an erroneous log messages about exceeding the stop timeout.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -824,7 +825,7 @@ private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
         return null;
     }
 
-    private void stopTask(ConnectorTaskId taskId) {
+    private void stopTask(ConnectorTaskId taskId, long timeout) {

Review comment:
       `timeoutMs` would be unambigous

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {

Review comment:
       We should name the thread so that thread dumps are a bit more informative. I _think_
these should be daemon threads because if we're prepared to basically ignore the non-return
of `task.stop()` during runtime I don't see why we'd block jvm exit for them. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        task.stop();
+                        latch.countDown();
+                    }
+                }.start();
+                // Wait for thread to terminate, but not longer than timeout.
+                if (timeout <= 0) {

Review comment:
       It's not required to protect the await, but is to get the logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message