kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
Date Thu, 15 Mar 2018 22:55:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401219#comment-16401219
] 

ASF GitHub Bot commented on KAFKA-6661:
---------------------------------------

ewencp closed pull request #4716: KAFKA-6661: Ensure sink connectors don’t resume consumer
when task is paused
URL: https://github.com/apache/kafka/pull/4716
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2995a4e813a..2ba785c4668 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -130,7 +130,7 @@ public void initialize(TaskConfig taskConfig) {
         try {
             this.taskConfig = taskConfig.originalsStrings();
             this.consumer = createConsumer();
-            this.context = new WorkerSinkTaskContext(consumer);
+            this.context = new WorkerSinkTaskContext(consumer, this);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be started.", this, t);
             onFailure(t);
@@ -601,7 +601,7 @@ SinkTaskMetricsGroup sinkTaskMetricsGroup() {
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            log.debug("{} Partitions assigned", WorkerSinkTask.this);
+            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 08789497645..386f992e82a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,6 +20,8 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,26 +31,32 @@
 import java.util.Set;
 
 public class WorkerSinkTaskContext implements SinkTaskContext {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private Map<TopicPartition, Long> offsets;
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
+    private final WorkerSinkTask sinkTask;
     private final Set<TopicPartition> pausedPartitions;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask
sinkTask) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
+        this.sinkTask = sinkTask;
         this.pausedPartitions = new HashSet<>();
     }
 
     @Override
     public void offset(Map<TopicPartition, Long> offsets) {
+        log.debug("{} Setting offsets for topic partitions {}", this, offsets);
         this.offsets.putAll(offsets);
     }
 
     @Override
     public void offset(TopicPartition tp, long offset) {
+        log.debug("{} Setting offset for topic partition {} to {}", this, tp, offset);
         offsets.put(tp, offset);
     }
 
@@ -66,6 +74,7 @@ public void clearOffsets() {
 
     @Override
     public void timeout(long timeoutMs) {
+        log.debug("{} Setting timeout to {} ms", this, timeoutMs);
         this.timeoutMs = timeoutMs;
     }
 
@@ -92,7 +101,12 @@ public void pause(TopicPartition... partitions) {
         }
         try {
             Collections.addAll(pausedPartitions, partitions);
-            consumer.pause(Arrays.asList(partitions));
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not pausing consumer's partitions {}",
this, partitions);
+            } else {
+                consumer.pause(Arrays.asList(partitions));
+                log.debug("{} Pausing partitions {}. Connector is not paused.", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not pause partitions that
are not currently assigned to them.", e);
         }
@@ -105,7 +119,12 @@ public void resume(TopicPartition... partitions) {
         }
         try {
             pausedPartitions.removeAll(Arrays.asList(partitions));
-            consumer.resume(Arrays.asList(partitions));
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not resuming consumer's partitions
{}", this, partitions);
+            } else {
+                consumer.resume(Arrays.asList(partitions));
+                log.debug("{} Resuming partitions: {}", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that
are not currently assigned to them.", e);
         }
@@ -117,6 +136,7 @@ public void resume(TopicPartition... partitions) {
 
     @Override
     public void requestCommit() {
+        log.debug("{} Requesting commit", this);
         commitRequested = true;
     }
 
@@ -128,4 +148,10 @@ public void clearCommitRequest() {
         commitRequested = false;
     }
 
+    @Override
+    public String toString() {
+        return "WorkerSinkTaskContext{" +
+               "id=" + sinkTask.id +
+               '}';
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Sink connectors that explicitly 'resume' topic partitions can resume a paused task
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-6661
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6661
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Critical
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to explicitly pause
and resume topic partitions. This is useful when connectors need additional time processing
the records for specific topic partitions (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker for the
sink tasks pause the consumer. When the connector is polled, the poll request might timeout
and return no records. Connect then calls the task's {{put(...)}} method (with no records),
and this allows the task to optionally call any of the {{SinkTaskContext}}'s pause or resume
methods. If it calls resume, this will unexpectedly resume the paused consumer, causing the
consumer to return messages and the connector to process those messages --  despite the connector
still being paused.
> This is reported against 1.0, but the affected code has not been changed since at least
0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message