pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [Pulsar IO]: Add a source connector for debezium postgres (#3924)
Date Fri, 29 Mar 2019 21:39:25 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4150f47  [Pulsar IO]: Add a source connector for debezium postgres (#3924)
4150f47 is described below

commit 4150f472860b907e6c6a47039f110fbec496a684
Author: Jia Zhai <zhaijia@apache.org>
AuthorDate: Sat Mar 30 05:39:20 2019 +0800

    [Pulsar IO]: Add a source connector for debezium postgres (#3924)
    
    This PR try to add a source connector for debezium postgres.
    
    changes:
    - add debezium postges project;
    - add DebeziumSource, and make mysql and postgres both inherit from it.
---
 pom.xml                                            |  2 +-
 .../apache/pulsar/io/debezium/DebeziumSource.java} | 35 ++++-----
 .../io/debezium/mysql/DebeziumMysqlSource.java     | 82 ++--------------------
 pulsar-io/debezium/pom.xml                         |  1 +
 pulsar-io/debezium/{ => postgres}/pom.xml          | 37 +++++++---
 .../debezium/postgres/DebeziumPostgresSource.java  | 37 ++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     | 22 ++++++
 .../resources/debezium-postgres-source-config.yaml | 41 +++++++++++
 8 files changed, 148 insertions(+), 109 deletions(-)

diff --git a/pom.xml b/pom.xml
index 539b5b8..d7aa9fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,7 @@ flexible messaging model and an intuitive client API.</description>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
-    <debezium.version>0.8.2</debezium.version>
+    <debezium.version>0.9.2.Final</debezium.version>
     <jsonwebtoken.version>0.10.5</jsonwebtoken.version>
     <opencensus.version>0.18.0</opencensus.version>
     <zstd.version>1.3.7-3</zstd.version>
diff --git a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
similarity index 78%
copy from pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
copy to pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index e8fe0c3..c207a95 100644
--- a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -16,32 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.debezium.mysql;
+package org.apache.pulsar.io.debezium;
 
 import java.util.Map;
 
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import lombok.extern.slf4j.Slf4j;
+import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.debezium.PulsarDatabaseHistory;
 import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
 import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
 
-/**
- * A pulsar source that runs
- */
-@Slf4j
-public class DebeziumMysqlSource extends KafkaConnectSource {
-    static private final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
+public abstract class DebeziumSource extends KafkaConnectSource {
     static private final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
     static private final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
-    static private final String DEFAULT_OFFSET_TOPIC = "debezium-mysql-offset-topic";
-    static private final String DEFAULT_HISTORY_TOPIC = "debezium-mysql-history-topic";
+    static private final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
+    static private final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";
 
-    private static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
+    public static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
                                                        String key,
                                                        String value) throws IllegalArgumentException
{
         Object orig = config.get(key);
@@ -56,15 +48,15 @@ public class DebeziumMysqlSource extends KafkaConnectSource {
         }
     }
 
-    private static void setConfigIfNull(Map<String, Object> config, String key, String
value) {
+    public static void setConfigIfNull(Map<String, Object> config, String key, String
value) {
         Object orig = config.get(key);
         if (orig == null) {
             config.put(key, value);
         }
     }
 
-    // namespace: tenant/namespace
-    private static String topicNamespace(SourceContext sourceContext) {
+    // namespace for output topics, default value is "tenant/namespace"
+    public static String topicNamespace(SourceContext sourceContext) {
         String tenant = sourceContext.getTenant();
         String namespace = sourceContext.getNamespace();
 
@@ -72,18 +64,19 @@ public class DebeziumMysqlSource extends KafkaConnectSource {
             (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace);
     }
 
+    public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;
+
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws
Exception {
-        // connector task
-        throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
+        setDbConnectorTask(config);
 
         // key.converter
         setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
         // value.converter
         setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
 
-        // database.history implementation class
-        setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
+        // database.history : implementation class for database history.
+        setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(),
DEFAULT_HISTORY);
 
         // database.history.pulsar.service.url, this is set as the value of pulsar.service.url
if null.
         String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
diff --git a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
index e8fe0c3..316abe3 100644
--- a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
+++ b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
@@ -20,91 +20,17 @@ package org.apache.pulsar.io.debezium.mysql;
 
 import java.util.Map;
 
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.debezium.PulsarDatabaseHistory;
-import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
-import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
 
 /**
- * A pulsar source that runs
+ * A pulsar source that runs debezium mysql source
  */
-@Slf4j
-public class DebeziumMysqlSource extends KafkaConnectSource {
+public class DebeziumMysqlSource extends DebeziumSource {
     static private final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
-    static private final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
-    static private final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
-    static private final String DEFAULT_OFFSET_TOPIC = "debezium-mysql-offset-topic";
-    static private final String DEFAULT_HISTORY_TOPIC = "debezium-mysql-history-topic";
-
-    private static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
-                                                       String key,
-                                                       String value) throws IllegalArgumentException
{
-        Object orig = config.get(key);
-        if (orig == null) {
-            config.put(key, value);
-            return;
-        }
-
-        // throw exception if value not match
-        if (!orig.equals(value)) {
-            throw new IllegalArgumentException("Expected " + value + " but has " + orig);
-        }
-    }
-
-    private static void setConfigIfNull(Map<String, Object> config, String key, String
value) {
-        Object orig = config.get(key);
-        if (orig == null) {
-            config.put(key, value);
-        }
-    }
-
-    // namespace: tenant/namespace
-    private static String topicNamespace(SourceContext sourceContext) {
-        String tenant = sourceContext.getTenant();
-        String namespace = sourceContext.getNamespace();
-
-        return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" +
-            (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace);
-    }
 
     @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) throws
Exception {
-        // connector task
+    public void setDbConnectorTask(Map<String, Object> config) throws Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
-
-        // key.converter
-        setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
-        // value.converter
-        setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
-
-        // database.history implementation class
-        setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
-
-        // database.history.pulsar.service.url, this is set as the value of pulsar.service.url
if null.
-        String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
-        if (serviceUrl == null) {
-            throw new IllegalArgumentException("Pulsar service URL not provided.");
-        }
-        setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), serviceUrl);
-
-        String topicNamespace = topicNamespace(sourceContext);
-        // topic.namespace
-        setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
-
-        String sourceName = sourceContext.getSourceName();
-        // database.history.pulsar.topic: history topic name
-        setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
-            topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
-        // offset.storage.topic: offset topic name
-        setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
-            topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
-
-        super.open(config, sourceContext);
     }
-
 }
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 37a50cc..5d6b3a1 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -34,6 +34,7 @@
   <modules>
     <module>core</module>
     <module>mysql</module>
+    <module>postgres</module>
   </modules>
 
 </project>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/postgres/pom.xml
similarity index 56%
copy from pulsar-io/debezium/pom.xml
copy to pulsar-io/debezium/postgres/pom.xml
index 37a50cc..927db6e 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -19,21 +19,40 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <packaging>pom</packaging>
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-io</artifactId>
+    <artifactId>pulsar-io-debezium</artifactId>
     <version>2.4.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io-debezium</artifactId>
-  <name>Pulsar IO :: Debezium</name>
+  <artifactId>pulsar-io-debezium-postgres</artifactId>
+  <name>Pulsar IO :: Debezium :: postgres</name>
 
-  <modules>
-    <module>core</module>
-    <module>mysql</module>
-  </modules>
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-debezium-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-connector-postgres</artifactId>
+      <version>${debezium.version}</version>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 
 </project>
diff --git a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
new file mode 100644
index 0000000..fcd539e
--- /dev/null
+++ b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium.postgres;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
+
+
+/**
+ * A pulsar source that runs debezium postgres source
+ */
+public class DebeziumPostgresSource extends DebeziumSource {
+    static private final String DEFAULT_TASK = "io.debezium.connector.postgresql.PostgresConnectorTask";
+
+    @Override
+    public void setDbConnectorTask(Map<String, Object> config) throws Exception {
+        throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
+    }
+}
diff --git a/pulsar-io/debezium/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/debezium/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..6577d7d
--- /dev/null
+++ b/pulsar-io/debezium/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: debezium-postgres
+description: Debezium Postgres Source
+sourceClass: org.apache.pulsar.io.debezium.postgres.DebeziumPostgresSource
diff --git a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
new file mode 100644
index 0000000..e24f2e1
--- /dev/null
+++ b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "public"
+namespace: "default"
+name: "debezium-postgres-source"
+topicName: "debezium-postgres-topic"
+archive: "connectors/pulsar-io-debezium-postgres-2.4.0-SNAPSHOT.nar"
+
+parallelism: 1
+
+configs:
+  ## config for pg, docker image: debezium/example-postgress:0.8
+  database.hostname: "localhost"
+  database.port: "5432"
+  database.user: "postgres"
+  database.password: "postgres"
+  database.dbname: "postgres"
+  database.server.name: "dbserver1"
+  schema.whitelist: "inventory"
+
+  ## PULSAR_SERVICE_URL_CONFIG
+  pulsar.service.url: "pulsar://127.0.0.1:6650"
+
+


Mime
View raw message