kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 0.10.2 updated: KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused
Date Thu, 15 Mar 2018 23:08:23 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new df5d142  KAFKA-6661: Ensure sink connectors don’t resume consumer when task is
paused
df5d142 is described below

commit df5d1425944be88853b8e109d0b08f0002fdff34
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Thu Mar 15 15:52:53 2018 -0700

    KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused
    
    Changed WorkerSinkTaskContext to only resume the consumer topic partitions when the connector/task
is not in the paused state.
    
    The context tracks the set of topic partitions that are explicitly paused/resumed by the
connector, and when the WorkerSinkTask resumes the tasks it currently resumes all topic partitions
*except* those that are still explicitly paused in the context. Therefore, the change above
should result in the desired behavior.
    
    Several debug statements were added to record when the context is called by the connector.
    
    This can be backported to older releases, since this bug goes back to 0.10 or 0.9.
    
    Author: Randall Hauch <rhauch@gmail.com>
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4716 from rhauch/kafka-6661
    
    (cherry picked from commit e7ef719a5bc0d1276f0e9482d59b25406fda276b)
    Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  4 +--
 .../connect/runtime/WorkerSinkTaskContext.java     | 39 ++++++++++++++++++----
 2 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1111dd1..8b4e646 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -110,7 +110,7 @@ class WorkerSinkTask extends WorkerTask {
         try {
             this.taskConfig = taskConfig.originalsStrings();
             this.consumer = createConsumer();
-            this.context = new WorkerSinkTaskContext(consumer);
+            this.context = new WorkerSinkTaskContext(consumer, this);
         } catch (Throwable t) {
             log.error("Task {} failed initialization and will not be started.", t);
             onFailure(t);
@@ -511,7 +511,7 @@ class WorkerSinkTask extends WorkerTask {
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            log.debug("{} Partitions assigned", WorkerSinkTask.this);
+            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index ede76c4..288da78 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -15,34 +15,43 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 public class WorkerSinkTaskContext implements SinkTaskContext {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private Map<TopicPartition, Long> offsets;
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
+    private final WorkerSinkTask sinkTask;
     private final Set<TopicPartition> pausedPartitions;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask
sinkTask) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
+        this.sinkTask = sinkTask;
         this.pausedPartitions = new HashSet<>();
     }
 
     @Override
     public void offset(Map<TopicPartition, Long> offsets) {
+        log.debug("{} Setting offsets for topic partitions {}", this, offsets);
         this.offsets.putAll(offsets);
     }
 
     @Override
     public void offset(TopicPartition tp, long offset) {
+        log.debug("{} Setting offset for topic partition {} to {}", this, tp, offset);
         offsets.put(tp, offset);
     }
 
@@ -60,6 +69,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
 
     @Override
     public void timeout(long timeoutMs) {
+        log.debug("{} Setting timeout to {} ms", this, timeoutMs);
         this.timeoutMs = timeoutMs;
     }
 
@@ -85,9 +95,13 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause
consumption until the task is initialized");
         }
         try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.add(partition);
-            consumer.pause(Arrays.asList(partitions));
+            Collections.addAll(pausedPartitions, partitions);
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not pausing consumer's partitions {}",
this, partitions);
+            } else {
+                consumer.pause(Arrays.asList(partitions));
+                log.debug("{} Pausing partitions {}. Connector is not paused.", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not pause partitions that
are not currently assigned to them.", e);
         }
@@ -99,9 +113,13 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume
consumption until the task is initialized");
         }
         try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.remove(partition);
-            consumer.resume(Arrays.asList(partitions));
+            pausedPartitions.removeAll(Arrays.asList(partitions));
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not resuming consumer's partitions
{}", this, partitions);
+            } else {
+                consumer.resume(Arrays.asList(partitions));
+                log.debug("{} Resuming partitions: {}", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that
are not currently assigned to them.", e);
         }
@@ -113,6 +131,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
 
     @Override
     public void requestCommit() {
+        log.debug("{} Requesting commit", this);
         commitRequested = true;
     }
 
@@ -124,4 +143,10 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         commitRequested = false;
     }
 
+    @Override
+    public String toString() {
+        return "WorkerSinkTaskContext{" +
+               "id=" + sinkTask.id +
+               '}';
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message