From commits-return-24935-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Tue Mar 19 13:10:23 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DAB40180626 for ; Tue, 19 Mar 2019 14:10:21 +0100 (CET) Received: (qmail 71113 invoked by uid 500); 19 Mar 2019 13:10:21 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 71104 invoked by uid 99); 19 Mar 2019 13:10:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Mar 2019 13:10:20 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 59C6482F2C; Tue, 19 Mar 2019 13:10:20 +0000 (UTC) Date: Tue, 19 Mar 2019 13:10:20 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] branch master updated: hide kafka-connecter details for easy use debezium connector (#3825) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155300102010.21393.2133189989432796698@gitbox.apache.org> From: sijie@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 7b2ccd197443c8654d0d5b83bbeb5c1eda82276c X-Git-Newrev: 8fc4f3c1288e709993ff411931794313e885047b X-Git-Rev: 8fc4f3c1288e709993ff411931794313e885047b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8fc4f3c hide kafka-connecter details for easy use debezium connector (#3825) 8fc4f3c is described below commit 8fc4f3c1288e709993ff411931794313e885047b Author: Jia Zhai AuthorDate: Tue Mar 19 21:10:15 2019 +0800 hide kafka-connecter details for easy use debezium connector (#3825) currently we explored too much internal config for debezium connector, this PR is to hide some details and make the config easier. expected pass integration tests. --- distribution/io/src/assemble/io.xml | 1 + pulsar-io/debezium/{ => core}/pom.xml | 12 +-- .../pulsar/io/debezium/PulsarDatabaseHistory.java | 0 .../io/debezium/PulsarDatabaseHistoryTest.java | 0 pulsar-io/debezium/mysql/pom.xml | 59 +++++++++++ .../io/debezium/mysql/DebeziumMysqlSource.java | 110 +++++++++++++++++++++ .../resources/META-INF/services/pulsar-io.yaml | 22 +++++ .../resources/debezium-mysql-source-config.yaml | 20 +--- pulsar-io/debezium/pom.xml | 75 +------------- pulsar-io/kafka-connect-adaptor/pom.xml | 6 -- .../io/kafka/connect/KafkaConnectSource.java | 7 +- .../io/kafka/connect/PulsarKafkaWorkerConfig.java | 14 ++- .../containers/DebeziumMySQLContainer.java | 2 +- .../integration/functions/PulsarFunctionsTest.java | 7 +- .../integration/io/DebeziumMySqlSourceTester.java | 28 +++--- 15 files changed, 245 insertions(+), 118 deletions(-) diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 186e391..339522f 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -64,5 +64,6 @@ ${basedir}/../../pulsar-io/canal/target/pulsar-io-canal-${project.version}.nar ${basedir}/../../pulsar-io/netty/target/pulsar-io-netty-${project.version}.nar ${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar + ${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/core/pom.xml similarity index 91% copy from pulsar-io/debezium/pom.xml copy to pulsar-io/debezium/core/pom.xml index ac6a5d9..23ca973 100644 --- a/pulsar-io/debezium/pom.xml +++ b/pulsar-io/debezium/core/pom.xml @@ -23,12 +23,12 @@ 4.0.0 org.apache.pulsar - pulsar-io + pulsar-io-debezium 2.4.0-SNAPSHOT - pulsar-io-debezium - Pulsar IO :: Debezium + pulsar-io-debezium-core + Pulsar IO :: Debezium :: Core @@ -45,9 +45,9 @@ - io.debezium - debezium-connector-mysql - ${debezium.version} + ${project.groupId} + pulsar-io-kafka-connect-adaptor + ${project.version} diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java similarity index 100% rename from pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java rename to pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java similarity index 100% rename from pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java rename to pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml new file mode 100644 index 0000000..9754165 --- /dev/null +++ b/pulsar-io/debezium/mysql/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + org.apache.pulsar + pulsar-io-debezium + 2.4.0-SNAPSHOT + + + pulsar-io-debezium-mysql + Pulsar IO :: Debezium :: mysql + + + + + ${project.groupId} + pulsar-io-debezium-core + ${project.version} + + + + io.debezium + debezium-connector-mysql + ${debezium.version} + + + + + + + + + org.apache.nifi + nifi-nar-maven-plugin + + + + + diff --git a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java new file mode 100644 index 0000000..e8fe0c3 --- /dev/null +++ b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium.mysql; + +import java.util.Map; + +import io.debezium.connector.mysql.MySqlConnectorConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.debezium.PulsarDatabaseHistory; +import org.apache.pulsar.io.kafka.connect.KafkaConnectSource; +import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; + +/** + * A pulsar source that runs + */ +@Slf4j +public class DebeziumMysqlSource extends KafkaConnectSource { + static private final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask"; + static private final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; + static private final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"; + static private final String DEFAULT_OFFSET_TOPIC = "debezium-mysql-offset-topic"; + static private final String DEFAULT_HISTORY_TOPIC = "debezium-mysql-history-topic"; + + private static void throwExceptionIfConfigNotMatch(Map config, + String key, + String value) throws IllegalArgumentException { + Object orig = config.get(key); + if (orig == null) { + config.put(key, value); + return; + } + + // throw exception if value not match + if (!orig.equals(value)) { + throw new IllegalArgumentException("Expected " + value + " but has " + orig); + } + } + + private static void setConfigIfNull(Map config, String key, String value) { + Object orig = config.get(key); + if (orig == null) { + config.put(key, value); + } + } + + // namespace: tenant/namespace + private static String topicNamespace(SourceContext sourceContext) { + String tenant = sourceContext.getTenant(); + String namespace = sourceContext.getNamespace(); + + return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" + + (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace); + } + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + // connector task + throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); + + // key.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + // value.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + + // database.history implementation class + setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY); + + // database.history.pulsar.service.url, this is set as the value of pulsar.service.url if null. + String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG); + if (serviceUrl == null) { + throw new IllegalArgumentException("Pulsar service URL not provided."); + } + setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), serviceUrl); + + String topicNamespace = topicNamespace(sourceContext); + // topic.namespace + setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace); + + String sourceName = sourceContext.getSourceName(); + // database.history.pulsar.topic: history topic name + setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(), + topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC); + // offset.storage.topic: offset topic name + setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, + topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); + + super.open(config, sourceContext); + } + +} diff --git a/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000..288a0df --- /dev/null +++ b/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: debezium-mysql +description: Debezium MySql Source +sourceClass: org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml similarity index 64% rename from pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml rename to pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml index b0361f9..7056bc1 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml +++ b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml @@ -19,17 +19,13 @@ tenant: "test" namespace: "test-namespace" -name: "debezium-kafka-source" -topicName: "kafka-connect-topic" -archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar" +name: "debezium-mysql-source" +topicName: "debezium-mysql-topic" +archive: "connectors/pulsar-io-debezium-mysql-2.4.0-SNAPSHOT.nar" -##autoAck: true parallelism: 1 configs: - ## sourceTask - task.class: "io.debezium.connector.mysql.MySqlConnectorTask" - ## config for mysql, docker image: debezium/example-mysql:0.8 database.hostname: "localhost" database.port: "3306" @@ -39,15 +35,9 @@ configs: database.server.name: "dbserver1" database.whitelist: "inventory" - database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" - database.history.pulsar.topic: "history-topic" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" - ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG - key.converter: "org.apache.kafka.connect.json.JsonConverter" - value.converter: "org.apache.kafka.connect.json.JsonConverter" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" - ## OFFSET_STORAGE_TOPIC_CONFIG - offset.storage.topic: "offset-topic" + database.history.pulsar.topic: "mysql-history-topic" + offset.storage.topic: "mysql-offset-topic" diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml index ac6a5d9..37a50cc 100644 --- a/pulsar-io/debezium/pom.xml +++ b/pulsar-io/debezium/pom.xml @@ -21,6 +21,7 @@ 4.0.0 + pom org.apache.pulsar pulsar-io @@ -30,75 +31,9 @@ pulsar-io-debezium Pulsar IO :: Debezium - - - - ${project.groupId} - pulsar-io-core - ${project.version} - - - - io.debezium - debezium-core - ${debezium.version} - - - - io.debezium - debezium-connector-mysql - ${debezium.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - ${kafka-client.version} - - - - org.apache.kafka - connect-runtime - ${kafka-client.version} - - - - ${project.groupId} - pulsar-client-original - ${project.version} - - - - ${project.groupId} - pulsar-broker - ${project.version} - test - - - - ${project.groupId} - managed-ledger-original - ${project.version} - test-jar - test - - - - ${project.groupId} - pulsar-zookeeper-utils - ${project.version} - test-jar - test - - - - ${project.groupId} - pulsar-broker - ${project.version} - test - test-jar - - - + + core + mysql + diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index cee7ec8..da5c620 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -39,12 +39,6 @@ - ${project.groupId} - pulsar-io-debezium - ${project.version} - - - org.apache.kafka kafka_${scala.binary.version} ${kafka-client.version} diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index a164703..3ccbb4b 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.kafka.connect; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + import java.util.Base64; import java.util.Collections; import java.util.HashMap; @@ -66,6 +68,7 @@ public class KafkaConnectSource implements Source> { private CompletableFuture flushFuture; private OffsetBackingStore offsetStore; private OffsetStorageReader offsetReader; + private String topicNamespace; @Getter private OffsetStorageWriter offsetWriter; // number of outstandingRecords that have been polled but not been acked @@ -86,6 +89,8 @@ public class KafkaConnectSource implements Source> { .getDeclaredConstructor() .newInstance(); + topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG); + // initialize the key and value converter keyConverter = ((Class)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))) .asSubclass(Converter.class) @@ -193,7 +198,7 @@ public class KafkaConnectSource implements Source> { .stream() .map(e -> e.getKey() + "=" + e.getValue()) .collect(Collectors.joining(","))); - this.destinationTopic = Optional.of(srcRecord.topic()); + this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic()); } @Override diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java index d00f776..624c59a 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java @@ -44,6 +44,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig { public static final String PULSAR_SERVICE_URL_CONFIG = "pulsar.service.url"; private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar service url"; + /** + * topic.namespace + */ + public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace"; + private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to store the output topics"; + static { CONFIG = new ConfigDef() .define(OFFSET_STORAGE_TOPIC_CONFIG, @@ -53,10 +59,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig { .define(PULSAR_SERVICE_URL_CONFIG, Type.STRING, Importance.HIGH, - PULSAR_SERVICE_URL_CONFIG_DOC); + PULSAR_SERVICE_URL_CONFIG_DOC) + .define(TOPIC_NAMESPACE_CONFIG, + Type.STRING, + "public/default", + Importance.HIGH, + TOPIC_NAMESPACE_CONFIG_DOC); } - public PulsarKafkaWorkerConfig(Map props) { super(CONFIG, props); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java index 2d0613e..6d87dd6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java @@ -23,7 +23,7 @@ import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; public class DebeziumMySQLContainer extends ChaosContainer { - public static final String NAME = "mysql"; + public static final String NAME = "debezium-mysql-example"; static final Integer[] PORTS = { 3306 }; private static final String IMAGE_NAME = "debezium/example-mysql:0.8"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 18eaba9..9c44a05 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -1163,7 +1163,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String outputTopicName = "debe-output-topic-name"; - final String consumeTopicName = "dbserver1.inventory.products"; + final String consumeTopicName = "public/default/dbserver1.inventory.products"; final String sourceName = "test-source-connector-" + functionRuntimeType + "-name-" + randomName(8); @@ -1182,6 +1182,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { .subscriptionType(SubscriptionType.Exclusive) .subscribe(); + @Cleanup DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster); // setup debezium mysql server @@ -1204,15 +1205,13 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages); // validate the source result - sourceTester.validateSourceResult(consumer, null); + sourceTester.validateSourceResult(consumer, 9); // delete the source deleteSource(tenant, namespace, sourceName); // get source info (source should be deleted) getSourceInfoNotFound(tenant, namespace, sourceName); - - pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java index b122ccc..ffecbc0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.io; +import java.io.Closeable; import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.Getter; @@ -39,9 +40,9 @@ import org.testng.Assert; * which is a MySQL database server preconfigured with an inventory database. */ @Slf4j -public class DebeziumMySqlSourceTester extends SourceTester { +public class DebeziumMySqlSourceTester extends SourceTester implements Closeable { - private static final String NAME = "kafka-connect-adaptor"; + private static final String NAME = "debezium-mysql"; private final String pulsarServiceUrl; @@ -55,28 +56,21 @@ public class DebeziumMySqlSourceTester extends SourceTester consumer, Map kvs) throws Exception { + public void validateSourceResult(Consumer consumer, int number) throws Exception { int recordsNumber = 0; Message msg = consumer.receive(2, TimeUnit.SECONDS); while(msg != null) { @@ -101,7 +95,15 @@ public class DebeziumMySqlSourceTester extends SourceTester