kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6566: Improve Connect Resource Cleanup
Date Fri, 18 May 2018 17:40:59 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 320dd39  KAFKA-6566: Improve Connect Resource Cleanup
320dd39 is described below

commit 320dd390b2e013bbee1edbfb04e2a0d4c21d191f
Author: Robert Yokota <rayokota@gmail.com>
AuthorDate: Fri May 18 10:39:34 2018 -0700

    KAFKA-6566: Improve Connect Resource Cleanup
    
    This is a change to improve resource cleanup for sink tasks and source tasks.  Now `Task.stop()`
is called from both `WorkerSinkTask.close()` and `WorkerSourceTask.close()`.
    
    It is called from `WorkerXXXTask.close()` since this method is called in the `finally`
block of `WorkerTask.run()`, and Connect developers use `stop()` to clean up resources.
    
    Author: Robert Yokota <rayokota@gmail.com>
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #5020 from rayokota/K6566-improve-connect-resource-cleanup
    
    (cherry picked from commit ee8abb2f7053575bd2abec8152907e0642b1d713)
    Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 21 +++++++++++---
 .../kafka/connect/runtime/WorkerSourceTask.java    | 33 +++++++++++++++++++---
 2 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index ac2744e..de75cba 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -141,10 +141,23 @@ class WorkerSinkTask extends WorkerTask {
     protected void close() {
         // FIXME Kafka needs to add a timeout parameter here for us to properly obey the
timeout
         // passed in
-        task.stop();
-        if (consumer != null)
-            consumer.close();
-        transformationChain.close();
+        try {
+            task.stop();
+        } catch (Throwable t) {
+            log.warn("Could not stop task", t);
+        }
+        if (consumer != null) {
+            try {
+                consumer.close();
+            } catch (Throwable t) {
+                log.warn("Could not close consumer", t);
+            }
+        }
+        try {
+            transformationChain.close();
+        } catch (Throwable t) {
+            log.warn("Could not close transformation chain", t);
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 473e235..59071b7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -82,6 +82,7 @@ class WorkerSourceTask extends WorkerTask {
     private Map<String, String> taskConfig;
     private boolean finishedStart = false;
     private boolean startedShutdownBeforeStartCompleted = false;
+    private boolean stopped = false;
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
@@ -129,8 +130,21 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     protected void close() {
-        producer.close(30, TimeUnit.SECONDS);
-        transformationChain.close();
+        if (!shouldPause()) {
+            tryStop();
+        }
+        if (producer != null) {
+            try {
+                producer.close(30, TimeUnit.SECONDS);
+            } catch (Throwable t) {
+                log.warn("Could not close producer", t);
+            }
+        }
+        try {
+            transformationChain.close();
+        } catch (Throwable t) {
+            log.warn("Could not close transformation chain", t);
+        }
     }
 
     @Override
@@ -144,12 +158,23 @@ class WorkerSourceTask extends WorkerTask {
         stopRequestedLatch.countDown();
         synchronized (this) {
             if (finishedStart)
-                task.stop();
+                tryStop();
             else
                 startedShutdownBeforeStartCompleted = true;
         }
     }
 
+    private synchronized void tryStop() {
+        if (!stopped) {
+            try {
+                task.stop();
+                stopped = true;
+            } catch (Throwable t) {
+                log.warn("Could not stop task", t);
+            }
+        }
+    }
+
     @Override
     public void execute() {
         try {
@@ -158,7 +183,7 @@ class WorkerSourceTask extends WorkerTask {
             log.info("{} Source task finished initialization and start", this);
             synchronized (this) {
                 if (startedShutdownBeforeStartCompleted) {
-                    task.stop();
+                    tryStop();
                     return;
                 }
                 finishedStart = true;

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message