kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7551: Refactor to create producer & consumer in the worker
Date Fri, 30 Nov 2018 07:39:08 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ace4dd0  KAFKA-7551: Refactor to create producer & consumer in the worker
ace4dd0 is described below

commit ace4dd00566afb7d04235bbbcc50097191af0fec
Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>
AuthorDate: Thu Nov 29 23:38:50 2018 -0800

    KAFKA-7551: Refactor to create producer & consumer in the worker
    
    This is minor refactoring that brings in the creation of producer and consumer to the
Worker. Currently, the consumer is created in the WorkerSinkTask. This should not affect any
functionality and it just makes the code structure easier to understand.
    
    Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>
    
    Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Randall Hauch <rhauch@gmail.com>,
Robert Yokota <rayokota@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #5842 from mageshn/KAFKA-7551
---
 .../org/apache/kafka/connect/runtime/Worker.java   | 62 ++++++++++++++------
 .../kafka/connect/runtime/WorkerSinkTask.java      | 31 +---------
 .../connect/runtime/ErrorHandlingTaskTest.java     | 10 ++--
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 13 ++---
 .../runtime/WorkerSinkTaskThreadedTest.java        |  6 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   | 66 +++++++++++++++++++++-
 6 files changed, 123 insertions(+), 65 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 81a165c..673bd4e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.MetricName;
@@ -50,6 +52,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +92,6 @@ public class Worker {
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
     private final OffsetBackingStore offsetBackingStore;
-    private final Map<String, Object> producerProps;
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
@@ -129,19 +131,6 @@ public class Worker {
 
         this.workerConfigTransformer = initConfigTransformer();
 
-        producerProps = new HashMap<>();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG),
","));
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        // These settings are designed to ensure there is no data loss. They *may* be overridden
via configs passed to the
-        // worker, but this may compromise the delivery guarantees of Kafka Connect.
-        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
-        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        // User-specified overrides
-        producerProps.putAll(config.originalsWithPrefix("producer."));
     }
 
     private WorkerConfigTransformer initConfigTransformer() {
@@ -499,6 +488,7 @@ public class Worker {
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore,
id.connector(),
                     internalKeyConverter, internalValueConverter);
+            Map<String, Object> producerProps = producerConfigs(config);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
             // Note we pass the configState as it performs dynamic transformations under
the covers
@@ -510,15 +500,54 @@ public class Worker {
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
+
+            Map<String, Object> consumerProps = consumerConfigs(id, config);
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
+
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState,
config, configState, metrics, keyConverter,
-                    valueConverter, headerConverter, transformationChain, loader, time,
-                    retryWithToleranceOperator);
+                                      valueConverter, headerConverter, transformationChain,
consumer, loader, time,
+                                      retryWithToleranceOperator);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or
SinkTask");
         }
     }
 
+    static Map<String, Object> producerConfigs(WorkerConfig config) {
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG),
","));
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        // These settings are designed to ensure there is no data loss. They *may* be overridden
via configs passed to the
+        // worker, but this may compromise the delivery guarantees of Kafka Connect.
+        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        // User-specified overrides
+        producerProps.putAll(config.originalsWithPrefix("producer."));
+        return producerProps;
+    }
+
+
+    static Map<String, Object> consumerConfigs(ConnectorTaskId id, WorkerConfig config)
{
+        // Include any unknown worker configs so consumer configs can be set globally on
the worker
+        // and through to the task
+        Map<String, Object> consumerProps = new HashMap<>();
+
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                  Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        consumerProps.putAll(config.originalsWithPrefix("consumer."));
+        return consumerProps;
+    }
+
     ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return new ErrorHandlingMetrics(id, metrics);
     }
@@ -532,6 +561,7 @@ public class Worker {
         // check if topic for dead letter queue exists
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
+            Map<String, Object> producerProps = producerConfigs(config);
             DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config,
id, connConfig, producerProps, errorHandlingMetrics);
             reporters.add(reporter);
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 39e0c6d..a112bfa 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,7 +32,6 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -49,7 +47,6 @@ import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +102,7 @@ class WorkerSinkTask extends WorkerTask {
                           Converter valueConverter,
                           HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
+                          KafkaConsumer<byte[], byte[]> consumer,
                           ClassLoader loader,
                           Time time,
                           RetryWithToleranceOperator retryWithToleranceOperator) {
@@ -131,13 +129,13 @@ class WorkerSinkTask extends WorkerTask {
         this.commitFailures = 0;
         this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
+        this.consumer = consumer;
     }
 
     @Override
     public void initialize(TaskConfig taskConfig) {
         try {
             this.taskConfig = taskConfig.originalsStrings();
-            this.consumer = createConsumer();
             this.context = new WorkerSinkTaskContext(consumer, this, configState);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be started.", this, t);
@@ -455,31 +453,6 @@ class WorkerSinkTask extends WorkerTask {
         return msgs;
     }
 
-    private KafkaConsumer<byte[], byte[]> createConsumer() {
-        // Include any unknown worker configs so consumer configs can be set globally on
the worker
-        // and through to the task
-        Map<String, Object> props = new HashMap<>();
-
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-
-        props.putAll(workerConfig.originalsWithPrefix("consumer."));
-
-        KafkaConsumer<byte[], byte[]> newConsumer;
-        try {
-            newConsumer = new KafkaConsumer<>(props);
-        } catch (Throwable t) {
-            throw new ConnectException("Failed to create consumer", t);
-        }
-
-        return newConsumer;
-    }
-
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
         origOffsets.clear();
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 6d92c34..5d223f4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -352,7 +352,6 @@ public class ErrorHandlingTaskTest {
     }
 
     private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerSinkTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
@@ -371,11 +370,10 @@ public class ErrorHandlingTaskTest {
 
         TransformationChain<SinkRecord> sinkTransforms = new TransformationChain<>(singletonList(new
FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
 
-        workerSinkTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig,
-                ClusterConfigState.EMPTY, metrics, converter, converter,
-                headerConverter, sinkTransforms, pluginLoader, time, retryWithToleranceOperator);
+        workerSinkTask = new WorkerSinkTask(
+            taskId, sinkTask, statusListener, initialState, workerConfig,
+            ClusterConfigState.EMPTY, metrics, converter, converter,
+            headerConverter, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator);
     }
 
     private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator)
{
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 7223c3b..3e047ff 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -162,12 +162,11 @@ public class WorkerSinkTaskTest {
     }
 
     private void createTask(TargetState initialState) {
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY,
metrics,
-                keyConverter, valueConverter, headerConverter,
-                transformationChain, pluginLoader, time,
-                RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+        workerTask = new WorkerSinkTask(
+            taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY,
metrics,
+            keyConverter, valueConverter, headerConverter,
+            transformationChain, consumer, pluginLoader, time,
+            RetryWithToleranceOperatorTest.NOOP_OPERATOR);
     }
 
     @After
@@ -1167,7 +1166,6 @@ public class WorkerSinkTaskTest {
 
         createTask(TargetState.PAUSED);
 
-        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
@@ -1255,7 +1253,6 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index d49c1cd..6e2b01c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -137,12 +137,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerConfig = new StandaloneConfig(workerProps);
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
+        workerTask = new WorkerSinkTask(
                 taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY,
metrics, keyConverter,
                 valueConverter, headerConverter,
                 new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR),
-                pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+                consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
 
         recordsReturned = 0;
     }
@@ -509,7 +508,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     }
 
     private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
 
         consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f3cacc4..8f15c87 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
@@ -87,9 +89,13 @@ public class WorkerTest extends ThreadedTest {
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
     private static final String WORKER_ID = "localhost:8083";
 
+    private Map<String, String> workerProps = new HashMap<>();
     private WorkerConfig config;
     private Worker worker;
 
+    private Map<String, String> defaultProducerConfigs = new HashMap<>();
+    private Map<String, String> defaultConsumerConfigs = new HashMap<>();
+
     @Mock
     private Plugins plugins;
     @Mock
@@ -116,8 +122,6 @@ public class WorkerTest extends ThreadedTest {
     @Before
     public void setup() {
         super.setup();
-
-        Map<String, String> workerProps = new HashMap<>();
         workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
@@ -128,6 +132,25 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         config = new StandaloneConfig(workerProps);
 
+        defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        defaultProducerConfigs.put(
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        defaultProducerConfigs.put(
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        defaultProducerConfigs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
+        defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+        defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"1");
+        defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+
+        defaultConsumerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        defaultConsumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        defaultConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        defaultConsumerConfigs
+            .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        defaultConsumerConfigs
+            .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
         PowerMock.mockStatic(Plugins.class);
     }
 
@@ -804,6 +827,45 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testProducerConfigsWithoutOverrides() {
+        assertEquals(defaultProducerConfigs, Worker.producerConfigs(config));
+    }
+
+    @Test
+    public void testProducerConfigsWithOverrides() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("producer.acks", "-1");
+        props.put("producer.linger.ms", "1000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
+        expectedConfigs.put("acks", "-1");
+        expectedConfigs.put("linger.ms", "1000");
+        assertEquals(expectedConfigs, Worker.producerConfigs(configWithOverrides));
+    }
+
+    @Test
+    public void testConsumerConfigsWithoutOverrides() {
+        Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
+        expectedConfigs.put("group.id", "connect-test");
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test",
1), config));
+    }
+
+    @Test
+    public void testConsumerConfigsWithOverrides() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("consumer.auto.offset.reset", "latest");
+        props.put("consumer.max.poll.records", "1000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
+        expectedConfigs.put("group.id", "connect-test");
+        expectedConfigs.put("auto.offset.reset", "latest");
+        expectedConfigs.put("max.poll.records", "1000");
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test",
1), configWithOverrides));
+    }
+
     private void assertStatistics(Worker worker, int connectors, int tasks) {
         MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
         assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-count"), 0.0001d);


Mime
View raw message