pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: hide kafka-connecter details for easy use debezium connector (#3825)
Date Tue, 19 Mar 2019 13:10:20 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fc4f3c  hide kafka-connecter details for easy use debezium connector (#3825)
8fc4f3c is described below

commit 8fc4f3c1288e709993ff411931794313e885047b
Author: Jia Zhai <zhaijia@apache.org>
AuthorDate: Tue Mar 19 21:10:15 2019 +0800

    hide kafka-connecter details for easy use debezium connector (#3825)
    
    currently we explored too much internal config for debezium connector, this PR is to hide
some details and make the config easier.
    expected pass integration tests.
---
 distribution/io/src/assemble/io.xml                |   1 +
 pulsar-io/debezium/{ => core}/pom.xml              |  12 +--
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  |   0
 .../io/debezium/PulsarDatabaseHistoryTest.java     |   0
 pulsar-io/debezium/mysql/pom.xml                   |  59 +++++++++++
 .../io/debezium/mysql/DebeziumMysqlSource.java     | 110 +++++++++++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 +++++
 .../resources/debezium-mysql-source-config.yaml    |  20 +---
 pulsar-io/debezium/pom.xml                         |  75 +-------------
 pulsar-io/kafka-connect-adaptor/pom.xml            |   6 --
 .../io/kafka/connect/KafkaConnectSource.java       |   7 +-
 .../io/kafka/connect/PulsarKafkaWorkerConfig.java  |  14 ++-
 .../containers/DebeziumMySQLContainer.java         |   2 +-
 .../integration/functions/PulsarFunctionsTest.java |   7 +-
 .../integration/io/DebeziumMySqlSourceTester.java  |  28 +++---
 15 files changed, 245 insertions(+), 118 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 186e391..339522f 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -64,5 +64,6 @@
     <file><source>${basedir}/../../pulsar-io/canal/target/pulsar-io-canal-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/netty/target/pulsar-io-netty-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar</source></file>
+    <file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
   </files>
 </assembly>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/core/pom.xml
similarity index 91%
copy from pulsar-io/debezium/pom.xml
copy to pulsar-io/debezium/core/pom.xml
index ac6a5d9..23ca973 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -23,12 +23,12 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-io</artifactId>
+    <artifactId>pulsar-io-debezium</artifactId>
     <version>2.4.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io-debezium</artifactId>
-  <name>Pulsar IO :: Debezium</name>
+  <artifactId>pulsar-io-debezium-core</artifactId>
+  <name>Pulsar IO :: Debezium :: Core</name>
 
   <dependencies>
 
@@ -45,9 +45,9 @@
     </dependency>
 
     <dependency>
-      <groupId>io.debezium</groupId>
-      <artifactId>debezium-connector-mysql</artifactId>
-      <version>${debezium.version}</version>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
similarity index 100%
rename from pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
rename to pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
similarity index 100%
rename from pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
rename to pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
new file mode 100644
index 0000000..9754165
--- /dev/null
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io-debezium</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-debezium-mysql</artifactId>
+  <name>Pulsar IO :: Debezium :: mysql</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-debezium-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-connector-mysql</artifactId>
+      <version>${debezium.version}</version>
+    </dependency>
+
+  </dependencies>
+
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
new file mode 100644
index 0000000..e8fe0c3
--- /dev/null
+++ b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium.mysql;
+
+import java.util.Map;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.debezium.PulsarDatabaseHistory;
+import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
+import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
+
+/**
+ * A pulsar source that runs
+ */
+@Slf4j
+public class DebeziumMysqlSource extends KafkaConnectSource {
+    static private final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
+    static private final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
+    static private final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
+    static private final String DEFAULT_OFFSET_TOPIC = "debezium-mysql-offset-topic";
+    static private final String DEFAULT_HISTORY_TOPIC = "debezium-mysql-history-topic";
+
+    private static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
+                                                       String key,
+                                                       String value) throws IllegalArgumentException
{
+        Object orig = config.get(key);
+        if (orig == null) {
+            config.put(key, value);
+            return;
+        }
+
+        // throw exception if value not match
+        if (!orig.equals(value)) {
+            throw new IllegalArgumentException("Expected " + value + " but has " + orig);
+        }
+    }
+
+    private static void setConfigIfNull(Map<String, Object> config, String key, String
value) {
+        Object orig = config.get(key);
+        if (orig == null) {
+            config.put(key, value);
+        }
+    }
+
+    // namespace: tenant/namespace
+    private static String topicNamespace(SourceContext sourceContext) {
+        String tenant = sourceContext.getTenant();
+        String namespace = sourceContext.getNamespace();
+
+        return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" +
+            (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace);
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws
Exception {
+        // connector task
+        throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
+
+        // key.converter
+        setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
+        // value.converter
+        setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
+
+        // database.history implementation class
+        setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
+
+        // database.history.pulsar.service.url, this is set as the value of pulsar.service.url
if null.
+        String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
+        if (serviceUrl == null) {
+            throw new IllegalArgumentException("Pulsar service URL not provided.");
+        }
+        setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), serviceUrl);
+
+        String topicNamespace = topicNamespace(sourceContext);
+        // topic.namespace
+        setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
+
+        String sourceName = sourceContext.getSourceName();
+        // database.history.pulsar.topic: history topic name
+        setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
+            topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
+        // offset.storage.topic: offset topic name
+        setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
+            topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
+
+        super.open(config, sourceContext);
+    }
+
+}
diff --git a/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..288a0df
--- /dev/null
+++ b/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: debezium-mysql
+description: Debezium MySql Source
+sourceClass: org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
similarity index 64%
rename from pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
rename to pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
index b0361f9..7056bc1 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
+++ b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -19,17 +19,13 @@
 
 tenant: "test"
 namespace: "test-namespace"
-name: "debezium-kafka-source"
-topicName: "kafka-connect-topic"
-archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar"
+name: "debezium-mysql-source"
+topicName: "debezium-mysql-topic"
+archive: "connectors/pulsar-io-debezium-mysql-2.4.0-SNAPSHOT.nar"
 
-##autoAck: true
 parallelism: 1
 
 configs:
-  ## sourceTask
-  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
-
   ## config for mysql, docker image: debezium/example-mysql:0.8
   database.hostname: "localhost"
   database.port: "3306"
@@ -39,15 +35,9 @@ configs:
   database.server.name: "dbserver1"
   database.whitelist: "inventory"
 
-  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
-  database.history.pulsar.topic: "history-topic"
-  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
-  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
-  key.converter: "org.apache.kafka.connect.json.JsonConverter"
-  value.converter: "org.apache.kafka.connect.json.JsonConverter"
   ## PULSAR_SERVICE_URL_CONFIG
   pulsar.service.url: "pulsar://127.0.0.1:6650"
-  ## OFFSET_STORAGE_TOPIC_CONFIG
-  offset.storage.topic: "offset-topic"
+  database.history.pulsar.topic: "mysql-history-topic"
+  offset.storage.topic: "mysql-offset-topic"
 
 
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index ac6a5d9..37a50cc 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -21,6 +21,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
+  <packaging>pom</packaging>
   <parent>
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar-io</artifactId>
@@ -30,75 +31,9 @@
   <artifactId>pulsar-io-debezium</artifactId>
   <name>Pulsar IO :: Debezium</name>
 
-  <dependencies>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>io.debezium</groupId>
-      <artifactId>debezium-core</artifactId>
-      <version>${debezium.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>io.debezium</groupId>
-      <artifactId>debezium-connector-mysql</artifactId>
-      <version>${debezium.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>${kafka-client.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>connect-runtime</artifactId>
-      <version>${kafka-client.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>managed-ledger-original</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-zookeeper-utils</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-
-  </dependencies>
+  <modules>
+    <module>core</module>
+    <module>mysql</module>
+  </modules>
 
 </project>
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index cee7ec8..da5c620 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -39,12 +39,6 @@
     </dependency>
 
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-debezium</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
       <version>${kafka-client.version}</version>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index a164703..3ccbb4b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
+import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,6 +68,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[],
byte[]>> {
     private CompletableFuture<Void> flushFuture;
     private OffsetBackingStore offsetStore;
     private OffsetStorageReader offsetReader;
+    private String topicNamespace;
     @Getter
     private OffsetStorageWriter offsetWriter;
     // number of outstandingRecords that have been polled but not been acked
@@ -86,6 +89,8 @@ public class KafkaConnectSource implements Source<KeyValue<byte[],
byte[]>> {
             .getDeclaredConstructor()
             .newInstance();
 
+        topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG);
+
         // initialize the key and value converter
         keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
             .asSubclass(Converter.class)
@@ -193,7 +198,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[],
byte[]>> {
                 .stream()
                 .map(e -> e.getKey() + "=" + e.getValue())
                 .collect(Collectors.joining(",")));
-            this.destinationTopic = Optional.of(srcRecord.topic());
+            this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic());
         }
 
         @Override
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index d00f776..624c59a 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -44,6 +44,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
     public static final String PULSAR_SERVICE_URL_CONFIG = "pulsar.service.url";
     private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar service url";
 
+    /**
+     * <code>topic.namespace</code>
+     */
+    public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace";
+    private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to
store the output topics";
+
     static {
         CONFIG = new ConfigDef()
             .define(OFFSET_STORAGE_TOPIC_CONFIG,
@@ -53,10 +59,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
             .define(PULSAR_SERVICE_URL_CONFIG,
                 Type.STRING,
                 Importance.HIGH,
-                PULSAR_SERVICE_URL_CONFIG_DOC);
+                PULSAR_SERVICE_URL_CONFIG_DOC)
+            .define(TOPIC_NAMESPACE_CONFIG,
+                Type.STRING,
+                "public/default",
+                Importance.HIGH,
+                TOPIC_NAMESPACE_CONFIG_DOC);
     }
 
-
     public PulsarKafkaWorkerConfig(Map<String, String> props) {
         super(CONFIG, props);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index 2d0613e..6d87dd6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -23,7 +23,7 @@ import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContainer>
{
 
-    public static final String NAME = "mysql";
+    public static final String NAME = "debezium-mysql-example";
     static final Integer[] PORTS = { 3306 };
 
     private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 18eaba9..9c44a05 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1163,7 +1163,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "dbserver1.inventory.products";
+        final String consumeTopicName = "public/default/dbserver1.inventory.products";
         final String sourceName = "test-source-connector-"
             + functionRuntimeType + "-name-" + randomName(8);
 
@@ -1182,6 +1182,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
             .subscriptionType(SubscriptionType.Exclusive)
             .subscribe();
 
+        @Cleanup
         DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);
 
         // setup debezium mysql server
@@ -1204,15 +1205,13 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
 
         // validate the source result
-        sourceTester.validateSourceResult(consumer, null);
+        sourceTester.validateSourceResult(consumer, 9);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
 
         // get source info (source should be deleted)
         getSourceInfoNotFound(tenant, namespace, sourceName);
-
-        pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer());
     }
 
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index b122ccc..ffecbc0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import java.io.Closeable;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
@@ -39,9 +40,9 @@ import org.testng.Assert;
  * which is a MySQL database server preconfigured with an inventory database.
  */
 @Slf4j
-public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer>
{
+public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer>
implements Closeable {
 
-    private static final String NAME = "kafka-connect-adaptor";
+    private static final String NAME = "debezium-mysql";
 
     private final String pulsarServiceUrl;
 
@@ -55,28 +56,21 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
         this.pulsarCluster = cluster;
         pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
 
-        sourceConfig.put("task.class", "io.debezium.connector.mysql.MySqlConnectorTask");
-        sourceConfig.put("database.hostname", "mysql");
+        sourceConfig.put("database.hostname", DebeziumMySQLContainer.NAME);
         sourceConfig.put("database.port", "3306");
         sourceConfig.put("database.user", "debezium");
         sourceConfig.put("database.password", "dbz");
         sourceConfig.put("database.server.id", "184054");
         sourceConfig.put("database.server.name", "dbserver1");
         sourceConfig.put("database.whitelist", "inventory");
-        sourceConfig.put("database.history", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
-        sourceConfig.put("database.history.pulsar.topic", "history-topic");
-        sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
-        sourceConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
-        sourceConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
-        sourceConfig.put("offset.storage.topic", "offset-topic");
     }
 
     @Override
     public void setServiceContainer(DebeziumMySQLContainer container) {
         log.info("start debezium mysql server container.");
         debeziumMySqlContainer = container;
-        pulsarCluster.startService("mysql", debeziumMySqlContainer);
+        pulsarCluster.startService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
     }
 
     @Override
@@ -90,7 +84,7 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
         return null;
     }
 
-    public void validateSourceResult(Consumer<String> consumer, Map<String, String>
kvs) throws Exception {
+    public void validateSourceResult(Consumer<String> consumer, int number) throws
Exception {
         int recordsNumber = 0;
         Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
         while(msg != null) {
@@ -101,7 +95,15 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
             msg = consumer.receive(1, TimeUnit.SECONDS);
         }
 
-        Assert.assertEquals(recordsNumber, 9);
+        Assert.assertEquals(recordsNumber, number);
         log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(),
recordsNumber);
     }
+
+    @Override
+    public void close() {
+        if (pulsarCluster != null) {
+            pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
+        }
+    }
+
 }


Mime
View raw message