pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2614: Debezium: add PulsarDatabaseHistory for debezium
Date Mon, 24 Sep 2018 06:16:04 GMT
sijie closed pull request #2614: Debezium: add PulsarDatabaseHistory for debezium
URL: https://github.com/apache/pulsar/pull/2614
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index a8d68a480c..83b83fee33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,6 +177,7 @@ flexible messaging model and an intuitive client API.</description>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
+    <debezium-core.version>0.8.2</debezium-core.version>
 
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
@@ -1348,7 +1349,7 @@ flexible messaging model and an intuitive client API.</description>
     <profile>
       <id>docker</id>
     </profile>
-    
+
     <profile>
       <!-- Checks style and licensing requirements. This is a good
            idea to run for contributions and for the release process. While it would
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
new file mode 100644
index 0000000000..5fefa43a74
--- /dev/null
+++ b/pulsar-io/debezium/pom.xml
@@ -0,0 +1,102 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-debezium</artifactId>
+  <name>Pulsar IO :: Debezium</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-core</artifactId>
+      <version>${debezium-core.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-zookeeper-utils</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
new file mode 100644
index 0000000000..bc97fc6353
--- /dev/null
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar
messages on the specified topic,
+ * and that recovers the history by establishing a Kafka Consumer re-processing all messages
on that topic.
+ */
+@Slf4j
+@ThreadSafe
+public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+
+    public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
+        .withDisplayName("Database history topic name")
+        .withType(Type.STRING)
+        .withWidth(Width.LONG)
+        .withImportance(Importance.HIGH)
+        .withDescription("The name of the topic for the database schema history")
+        .withValidation(Field::isRequired);
+
+    public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ "pulsar.service.url")
+        .withDisplayName("Kafka broker addresses")
+        .withType(Type.STRING)
+        .withWidth(Width.LONG)
+        .withImportance(Importance.HIGH)
+        .withDescription("Pulsar service url")
+        .withValidation(Field::isRequired);
+
+    public static Field.Set ALL_FIELDS = Field.setOf(
+        TOPIC,
+        SERVICE_URL,
+        DatabaseHistory.NAME);
+
+    private final DocumentReader reader = DocumentReader.defaultReader();
+    private String topicName;
+    private String serviceUrl;
+    private String dbHistoryName;
+    private volatile PulsarClient pulsarClient;
+    private volatile Producer<String> producer;
+
+
+    @Override
+    public void configure(Configuration config, HistoryRecordComparator comparator) {
+        super.configure(config, comparator);
+        if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
+            throw new IllegalArgumentException("Error configuring an instance of "
+                + getClass().getSimpleName() + "; check the logs for details");
+        }
+        this.topicName = config.getString(TOPIC);
+        this.serviceUrl = config.getString(SERVICE_URL);
+        // Copy the relevant portions of the configuration and add useful defaults ...
+        this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
+
+        log.info("Configure to store the debezium database history {} to pulsar topic {}
at {}",
+            dbHistoryName, topicName, serviceUrl);
+    }
+
+    @Override
+    public void initializeStorage() {
+        super.initializeStorage();
+
+        // try simple to publish an empty string to create topic
+        try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create())
{
+            p.send("");
+        } catch (PulsarClientException pce) {
+            log.error("Failed to initialize storage", pce);
+            throw new RuntimeException("Failed to initialize storage", pce);
+        }
+    }
+
+    void setupClientIfNeeded() {
+        if (null == this.pulsarClient) {
+            try {
+                pulsarClient = PulsarClient.builder()
+                    .serviceUrl(serviceUrl)
+                    .build();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException("Failed to create pulsar client to pulsar cluster
at "
+                    + serviceUrl, e);
+            }
+        }
+    }
+
+    void setupProducerIfNeeded() {
+        setupClientIfNeeded();
+        if (null == this.producer) {
+            try {
+                this.producer = pulsarClient.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .producerName(dbHistoryName)
+                    .blockIfQueueFull(true)
+                    .create();
+            } catch (PulsarClientException e) {
+                log.error("Failed to create pulsar producer to topic '{}' at cluster '{}'",
topicName, serviceUrl);
+                throw new RuntimeException("Failed to create pulsar producer to topic '"
+                    + topicName + "' at cluster '" + serviceUrl + "'", e);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        setupProducerIfNeeded();
+    }
+
+    @Override
+    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+        if (this.producer == null) {
+            throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+
+                " is called before storing database history records.");
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Storing record into database history: {}", record);
+        }
+        try {
+            producer.send(record.toString());
+        } catch (PulsarClientException e) {
+            throw new DatabaseHistoryException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (this.producer != null) {
+                try {
+                    producer.flush();
+                } catch (PulsarClientException pce) {
+                    // ignore the error to ensure the client is eventually closed
+                } finally {
+                    this.producer.close();
+                }
+                this.producer = null;
+            }
+            if (this.pulsarClient != null) {
+                pulsarClient.close();
+                this.pulsarClient = null;
+            }
+        } catch (PulsarClientException pe) {
+            log.warn("Failed to closing pulsar client", pe);
+        }
+    }
+
+    @Override
+    protected void recoverRecords(Consumer<HistoryRecord> records) {
+        setupClientIfNeeded();
+        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
+            .topic(topicName)
+            .startMessageId(MessageId.earliest)
+            .create()
+        ) {
+            log.info("Scanning the database history topic '{}'", topicName);
+
+            // Read all messages in the topic ...
+            MessageId lastProcessedMessageId = null;
+
+            // read the topic until the end
+            while (historyReader.hasMessageAvailable()) {
+                Message<String> msg = historyReader.readNext();
+                try {
+                    if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId())
< 0) {
+                        if (!isBlank(msg.getValue())) {
+                            HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue()));
+                            if (log.isTraceEnabled()) {
+                                log.trace("Recovering database history: {}", recordObj);
+                            }
+                            if (recordObj == null || !recordObj.isValid()) {
+                                log.warn("Skipping invalid database history record '{}'.
" +
+                                        "This is often not an issue, but if it happens repeatedly
please check the '{}' topic.",
+                                    recordObj, topicName);
+                            } else {
+                                records.accept(recordObj);
+                                log.trace("Recovered database history: {}", recordObj);
+                            }
+                        }
+                        lastProcessedMessageId = msg.getMessageId();
+                    }
+                } catch (IOException ioe) {
+                    log.error("Error while deserializing history record '{}'", msg.getValue(),
ioe);
+                } catch (final Exception e) {
+                    throw e;
+                }
+            }
+            log.info("Successfully completed scanning the database history topic '{}'", topicName);
+        } catch (IOException ioe) {
+            log.error("Encountered issues on recovering history records", ioe);
+            throw new RuntimeException("Encountered issues on recovering history records",
ioe);
+        }
+    }
+
+    @Override
+    public boolean exists() {
+        setupClientIfNeeded();
+        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
+            .topic(topicName)
+            .startMessageId(MessageId.earliest)
+            .create()
+        ) {
+            return historyReader.hasMessageAvailable();
+        } catch (IOException e) {
+            log.error("Encountered issues on checking existence of database history", e);
+            throw new RuntimeException("Encountered issues on checking existence of database
history", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (topicName != null) {
+            return "Pulsar topic (" + topicName + ") at " + serviceUrl;
+        }
+        return "Pulsar topic";
+    }
+}
diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
new file mode 100644
index 0000000000..e3b4fd94e7
--- /dev/null
+++ b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParserSql2003;
+import io.debezium.relational.ddl.LegacyDdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.text.ParsingException;
+import io.debezium.util.Collect;
+import java.util.Map;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the implementation of {@link PulsarDatabaseHistory}.
+ */
+public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
+
+    private PulsarDatabaseHistory history;
+    private Map<String, Object> position;
+    private Map<String, String> source;
+    private String topicName;
+    private String ddl;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        source = Collect.hashMapOf("server", "my-server");
+        setLogPosition(0);
+        this.topicName = "persistent://my-property/my-ns/schema-changes-topic";
+        this.history = new PulsarDatabaseHistory();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void testHistoryTopicContent(boolean skipUnparseableDDL) {
+        // Start up the history ...
+        Configuration config = Configuration.create()
+            .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
+            .with(PulsarDatabaseHistory.TOPIC, topicName)
+            .with(DatabaseHistory.NAME, "my-db-history")
+            .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)
+            .build();
+        history.configure(config, null);
+        history.start();
+
+        // Should be able to call start more than once ...
+        history.start();
+
+        history.initializeStorage();
+
+        // Calling it another time to ensure we can work with the DB history topic already
existing
+        history.initializeStorage();
+
+        LegacyDdlParser recoveryParser = new DdlParserSql2003();
+        LegacyDdlParser ddlParser = new DdlParserSql2003();
+        ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
+        Tables tables1 = new Tables();
+        Tables tables2 = new Tables();
+        Tables tables3 = new Tables();
+
+        // Recover from the very beginning ...
+        setLogPosition(0);
+        history.recover(source, position, tables1, recoveryParser);
+
+        // There should have been nothing to recover ...
+        assertEquals(tables1.size(), 0);
+
+        // Now record schema changes, which writes out to kafka but doesn't actually change
the Tables ...
+        setLogPosition(10);
+        ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
+            "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100)
NOT NULL ); \n" +
+            "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255)
NOT NULL); \n";
+        history.record(source, position, "db1", ddl);
+
+        // Parse the DDL statement 3x and each time update a different Tables object ...
+        ddlParser.parse(ddl, tables1);
+        assertEquals(3, tables1.size());
+        ddlParser.parse(ddl, tables2);
+        assertEquals(3, tables2.size());
+        ddlParser.parse(ddl, tables3);
+        assertEquals(3, tables3.size());
+
+        // Record a drop statement and parse it for 2 of our 3 Tables...
+        setLogPosition(39);
+        ddl = "DROP TABLE foo;";
+        history.record(source, position, "db1", ddl);
+        ddlParser.parse(ddl, tables2);
+        assertEquals(2, tables2.size());
+        ddlParser.parse(ddl, tables3);
+        assertEquals(2, tables3.size());
+
+        // Record another DDL statement and parse it for 1 of our 3 Tables...
+        setLogPosition(10003);
+        ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255)
NOT NULL);";
+        history.record(source, position, "db1", ddl);
+        ddlParser.parse(ddl, tables3);
+        assertEquals(3, tables3.size());
+
+        // Stop the history (which should stop the producer) ...
+        history.stop();
+        history = new PulsarDatabaseHistory();
+        history.configure(config, null);
+        // no need to start
+
+        // Recover from the very beginning to just past the first change ...
+        Tables recoveredTables = new Tables();
+        setLogPosition(15);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables1);
+
+        // Recover from the very beginning to just past the second change ...
+        recoveredTables = new Tables();
+        setLogPosition(50);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables2);
+
+        // Recover from the very beginning to just past the third change ...
+        recoveredTables = new Tables();
+        setLogPosition(10010);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables3);
+
+        // Recover from the very beginning to way past the third change ...
+        recoveredTables = new Tables();
+        setLogPosition(100000010);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables3);
+    }
+
+    protected void setLogPosition(int index) {
+        this.position = Collect.hashMapOf("filename", "my-txn-file.log",
+            "position", index);
+    }
+
+    @Test
+    public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception
{
+        // Create the empty topic ...
+        testHistoryTopicContent(false);
+    }
+
+    @Test
+    public void shouldIgnoreUnparseableMessages() throws Exception {
+        try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic(topicName)
+            .create()
+        ) {
+            producer.send("");
+            producer.send("{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP
TABLE foo;\"}");
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP
TABLE foo;\"}");
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP
TABLE foo;\"");
+            producer.send("\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP
TABLE foo;\"}");
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP
TABLE foo;\"}");
+        }
+
+        testHistoryTopicContent(true);
+    }
+
+    @Test(expectedExceptions = ParsingException.class)
+    public void shouldStopOnUnparseableSQL() throws Exception {
+        try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create())
{
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP
TABLE foo;\"}");
+        }
+
+        testHistoryTopicContent(false);
+    }
+
+
+    @Test
+    public void testExists() {
+        // happy path
+        testHistoryTopicContent(true);
+        assertTrue(history.exists());
+
+        // Set history to use dummy topic
+        Configuration config = Configuration.create()
+            .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
+            .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
+            .with(DatabaseHistory.NAME, "my-db-history")
+            .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
+            .build();
+
+        history.configure(config, null);
+        history.start();
+
+        // dummytopic should not exist yet
+        assertFalse(history.exists());
+    }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 10c67c9997..5956d62145 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -43,6 +43,7 @@
     <module>jdbc</module>
     <module>data-genenator</module>
     <module>elastic-search</module>
+    <module>debezium</module>
   </modules>
 
 </project>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message