From commits-return-10808-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Nov 30 08:39:12 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 81D5F180671 for ; Fri, 30 Nov 2018 08:39:11 +0100 (CET) Received: (qmail 97146 invoked by uid 500); 30 Nov 2018 07:39:10 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 97134 invoked by uid 99); 30 Nov 2018 07:39:10 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Nov 2018 07:39:10 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8A920829A1; Fri, 30 Nov 2018 07:39:09 +0000 (UTC) Date: Fri, 30 Nov 2018 07:39:08 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-7551: Refactor to create producer & consumer in the worker MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154356354783.1437.6226228106415224682@gitbox.apache.org> From: ewencp@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 4712a3641619e86b8e6d901355088f6ae06e9f37 X-Git-Newrev: ace4dd00566afb7d04235bbbcc50097191af0fec X-Git-Rev: ace4dd00566afb7d04235bbbcc50097191af0fec X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ace4dd0 KAFKA-7551: Refactor to create producer & consumer in the worker ace4dd0 is described below commit ace4dd00566afb7d04235bbbcc50097191af0fec Author: Magesh Nandakumar AuthorDate: Thu Nov 29 23:38:50 2018 -0800 KAFKA-7551: Refactor to create producer & consumer in the worker This is minor refactoring that brings in the creation of producer and consumer to the Worker. Currently, the consumer is created in the WorkerSinkTask. This should not affect any functionality and it just makes the code structure easier to understand. Author: Magesh Nandakumar Reviewers: Ryanne Dolan , Randall Hauch , Robert Yokota , Ewen Cheslack-Postava Closes #5842 from mageshn/KAFKA-7551 --- .../org/apache/kafka/connect/runtime/Worker.java | 62 ++++++++++++++------ .../kafka/connect/runtime/WorkerSinkTask.java | 31 +--------- .../connect/runtime/ErrorHandlingTaskTest.java | 10 ++-- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 13 ++--- .../runtime/WorkerSinkTaskThreadedTest.java | 6 +- .../apache/kafka/connect/runtime/WorkerTest.java | 66 +++++++++++++++++++++- 6 files changed, 123 insertions(+), 65 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 81a165c..673bd4e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.MetricName; @@ -50,6 +52,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.SinkUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +92,6 @@ public class Worker { private final Converter internalKeyConverter; private final Converter internalValueConverter; private final OffsetBackingStore offsetBackingStore; - private final Map producerProps; private final ConcurrentMap connectors = new ConcurrentHashMap<>(); private final ConcurrentMap tasks = new ConcurrentHashMap<>(); @@ -129,19 +131,6 @@ public class Worker { this.workerConfigTransformer = initConfigTransformer(); - producerProps = new HashMap<>(); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the - // worker, but this may compromise the delivery guarantees of Kafka Connect. - producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); - producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); - producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); - producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); - // User-specified overrides - producerProps.putAll(config.originalsWithPrefix("producer.")); } private WorkerConfigTransformer initConfigTransformer() { @@ -499,6 +488,7 @@ public class Worker { internalKeyConverter, internalValueConverter); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), internalKeyConverter, internalValueConverter); + Map producerProps = producerConfigs(config); KafkaProducer producer = new KafkaProducer<>(producerProps); // Note we pass the configState as it performs dynamic transformations under the covers @@ -510,15 +500,54 @@ public class Worker { log.info("Initializing: {}", transformationChain); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings()); retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics)); + + Map consumerProps = consumerConfigs(id, config); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, - valueConverter, headerConverter, transformationChain, loader, time, - retryWithToleranceOperator); + valueConverter, headerConverter, transformationChain, consumer, loader, time, + retryWithToleranceOperator); } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); } } + static Map producerConfigs(WorkerConfig config) { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the + // worker, but this may compromise the delivery guarantees of Kafka Connect. + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); + producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); + // User-specified overrides + producerProps.putAll(config.originalsWithPrefix("producer.")); + return producerProps; + } + + + static Map consumerConfigs(ConnectorTaskId id, WorkerConfig config) { + // Include any unknown worker configs so consumer configs can be set globally on the worker + // and through to the task + Map consumerProps = new HashMap<>(); + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector())); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + consumerProps.putAll(config.originalsWithPrefix("consumer.")); + return consumerProps; + } + ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { return new ErrorHandlingMetrics(id, metrics); } @@ -532,6 +561,7 @@ public class Worker { // check if topic for dead letter queue exists String topic = connConfig.dlqTopicName(); if (topic != null && !topic.isEmpty()) { + Map producerProps = producerConfigs(config); DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, errorHandlingMetrics); reporters.add(reporter); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 39e0c6d..a112bfa 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -33,7 +32,6 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; @@ -49,7 +47,6 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.SinkUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +102,7 @@ class WorkerSinkTask extends WorkerTask { Converter valueConverter, HeaderConverter headerConverter, TransformationChain transformationChain, + KafkaConsumer consumer, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator) { @@ -131,13 +129,13 @@ class WorkerSinkTask extends WorkerTask { this.commitFailures = 0; this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics); this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno); + this.consumer = consumer; } @Override public void initialize(TaskConfig taskConfig) { try { this.taskConfig = taskConfig.originalsStrings(); - this.consumer = createConsumer(); this.context = new WorkerSinkTaskContext(consumer, this, configState); } catch (Throwable t) { log.error("{} Task failed initialization and will not be started.", this, t); @@ -455,31 +453,6 @@ class WorkerSinkTask extends WorkerTask { return msgs; } - private KafkaConsumer createConsumer() { - // Include any unknown worker configs so consumer configs can be set globally on the worker - // and through to the task - Map props = new HashMap<>(); - - props.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector())); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - - props.putAll(workerConfig.originalsWithPrefix("consumer.")); - - KafkaConsumer newConsumer; - try { - newConsumer = new KafkaConsumer<>(props); - } catch (Throwable t) { - throw new ConnectException("Failed to create consumer", t); - } - - return newConsumer; - } - private void convertMessages(ConsumerRecords msgs) { origOffsets.clear(); for (ConsumerRecord msg : msgs) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 6d92c34..5d223f4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -352,7 +352,6 @@ public class ErrorHandlingTaskTest { } private void expectInitializeTask() throws Exception { - PowerMock.expectPrivate(workerSinkTask, "createConsumer").andReturn(consumer); consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); @@ -371,11 +370,10 @@ public class ErrorHandlingTaskTest { TransformationChain sinkTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough()), retryWithToleranceOperator); - workerSinkTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, - ClusterConfigState.EMPTY, metrics, converter, converter, - headerConverter, sinkTransforms, pluginLoader, time, retryWithToleranceOperator); + workerSinkTask = new WorkerSinkTask( + taskId, sinkTask, statusListener, initialState, workerConfig, + ClusterConfigState.EMPTY, metrics, converter, converter, + headerConverter, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator); } private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 7223c3b..3e047ff 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -162,12 +162,11 @@ public class WorkerSinkTaskTest { } private void createTask(TargetState initialState) { - workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, - keyConverter, valueConverter, headerConverter, - transformationChain, pluginLoader, time, - RetryWithToleranceOperatorTest.NOOP_OPERATOR); + workerTask = new WorkerSinkTask( + taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, + keyConverter, valueConverter, headerConverter, + transformationChain, consumer, pluginLoader, time, + RetryWithToleranceOperatorTest.NOOP_OPERATOR); } @After @@ -1167,7 +1166,6 @@ public class WorkerSinkTaskTest { createTask(TargetState.PAUSED); - PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); @@ -1255,7 +1253,6 @@ public class WorkerSinkTaskTest { } private void expectInitializeTask() throws Exception { - PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index d49c1cd..6e2b01c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -137,12 +137,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); pluginLoader = PowerMock.createMock(PluginClassLoader.class); workerConfig = new StandaloneConfig(workerProps); - workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer"}, + workerTask = new WorkerSinkTask( taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR), - pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR); + consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR); recordsReturned = 0; } @@ -509,7 +508,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { } private void expectInitializeTask() throws Exception { - PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index f3cacc4..8f15c87 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -17,7 +17,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -87,9 +89,13 @@ public class WorkerTest extends ThreadedTest { private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0); private static final String WORKER_ID = "localhost:8083"; + private Map workerProps = new HashMap<>(); private WorkerConfig config; private Worker worker; + private Map defaultProducerConfigs = new HashMap<>(); + private Map defaultConsumerConfigs = new HashMap<>(); + @Mock private Plugins plugins; @Mock @@ -116,8 +122,6 @@ public class WorkerTest extends ThreadedTest { @Before public void setup() { super.setup(); - - Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); @@ -128,6 +132,25 @@ public class WorkerTest extends ThreadedTest { workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); config = new StandaloneConfig(workerProps); + defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + defaultProducerConfigs.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + defaultProducerConfigs.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + defaultProducerConfigs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); + defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); + defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); + defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); + + defaultConsumerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + defaultConsumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + defaultConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + defaultConsumerConfigs + .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + defaultConsumerConfigs + .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + PowerMock.mockStatic(Plugins.class); } @@ -804,6 +827,45 @@ public class WorkerTest extends ThreadedTest { PowerMock.verifyAll(); } + @Test + public void testProducerConfigsWithoutOverrides() { + assertEquals(defaultProducerConfigs, Worker.producerConfigs(config)); + } + + @Test + public void testProducerConfigsWithOverrides() { + Map props = new HashMap<>(workerProps); + props.put("producer.acks", "-1"); + props.put("producer.linger.ms", "1000"); + WorkerConfig configWithOverrides = new StandaloneConfig(props); + + Map expectedConfigs = new HashMap<>(defaultProducerConfigs); + expectedConfigs.put("acks", "-1"); + expectedConfigs.put("linger.ms", "1000"); + assertEquals(expectedConfigs, Worker.producerConfigs(configWithOverrides)); + } + + @Test + public void testConsumerConfigsWithoutOverrides() { + Map expectedConfigs = new HashMap<>(defaultConsumerConfigs); + expectedConfigs.put("group.id", "connect-test"); + assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config)); + } + + @Test + public void testConsumerConfigsWithOverrides() { + Map props = new HashMap<>(workerProps); + props.put("consumer.auto.offset.reset", "latest"); + props.put("consumer.max.poll.records", "1000"); + WorkerConfig configWithOverrides = new StandaloneConfig(props); + + Map expectedConfigs = new HashMap<>(defaultConsumerConfigs); + expectedConfigs.put("group.id", "connect-test"); + expectedConfigs.put("auto.offset.reset", "latest"); + expectedConfigs.put("max.poll.records", "1000"); + assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides)); + } + private void assertStatistics(Worker worker, int connectors, int tasks) { MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup(); assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d);