pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: a simple debezium integration test (#3154)
Date Tue, 11 Dec 2018 00:08:43 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1260bf3  a simple debezium integration test (#3154)
1260bf3 is described below

commit 1260bf32c51a53c32a4b524407fd2689240611ab
Author: Jia Zhai <jiazhai@users.noreply.github.com>
AuthorDate: Tue Dec 11 08:08:37 2018 +0800

    a simple debezium integration test (#3154)
    
    ### Motivation
    
    create a simple debezium integration test.
    In this test, it start a Debezium MySQL Container based on "debezium/example-mysql:0.8",
then use debezium source connector to binlog from MySQL, and store the debezium output into
Pulsar.
    It verify the consumer readout message number is as expected.
    
    ### Modifications
    
     create a simple debezium integration test.
    
    ### Result
    
    this test passed.
---
 .../containers/DebeziumMySQLContainer.java         |  56 +++++++++++
 .../integration/functions/PulsarFunctionsTest.java |  67 ++++++++++++-
 .../integration/io/DebeziumMySqlSourceTester.java  | 107 +++++++++++++++++++++
 3 files changed, 229 insertions(+), 1 deletion(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
new file mode 100644
index 0000000..2d0613e
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContainer>
{
+
+    public static final String NAME = "mysql";
+    static final Integer[] PORTS = { 3306 };
+
+    private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
+
+    public DebeziumMySQLContainer(String clusterName) {
+        super(clusterName, IMAGE_NAME);
+        this.withEnv("MYSQL_USER", "mysqluser");
+        this.withEnv("MYSQL_PASSWORD", "mysqlpw");
+        this.withEnv("MYSQL_ROOT_PASSWORD", "debezium");
+
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName;
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+            .withExposedPorts(PORTS)
+            .withCreateContainerCmdModifier(createContainerCmd -> {
+                createContainerCmd.withHostName(NAME);
+                createContainerCmd.withName(getContainerName());
+            })
+            .waitingFor(new HostPortWaitStrategy());
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index f100c21..94aad28 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
+import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -100,6 +101,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         testSink(new ElasticSearchSinkTester(), true);
     }
 
+    @Test
+    public void testDebeziumMySqlSource() throws Exception {
+        testDebeziumMySqlConnect();
+    }
+
     private void testSink(SinkTester tester, boolean builtin) throws Exception {
         tester.startServiceContainer(pulsarCluster);
         try {
@@ -1130,4 +1136,63 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
             assertEquals("value-" + i, msg.getValue());
         }
     }
-}
\ No newline at end of file
+
+    private  void testDebeziumMySqlConnect()
+        throws Exception {
+
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String outputTopicName = "debe-output-topic-name";
+        final String consumeTopicName = "dbserver1.inventory.products";
+        final String sourceName = "test-source-connector-"
+            + functionRuntimeType + "-name-" + randomName(8);
+
+        // This is the binlog count that contained in mysql container.
+        final int numMessages = 47;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(consumeTopicName)
+            .subscriptionName("debezium-source-tester")
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);
+
+        // setup debezium mysql server
+        DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(pulsarCluster.getClusterName());
+        sourceTester.setServiceContainer(mySQLContainer);
+
+        // prepare the testing environment for source
+        prepareSource(sourceTester);
+
+        // submit the source connector
+        submitSourceConnector(sourceTester, tenant, namespace, sourceName, outputTopicName);
+
+        // get source info
+        getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
+
+        // get source status
+        getSourceStatus(tenant, namespace, sourceName);
+
+        // wait for source to process messages
+        waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+
+        // validate the source result
+        sourceTester.validateSourceResult(consumer, null);
+
+        // delete the source
+        deleteSource(tenant, namespace, sourceName);
+
+        // get source info (source should be deleted)
+        getSourceInfoNotFound(tenant, namespace, sourceName);
+
+        pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer());
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
new file mode 100644
index 0000000..b122ccc
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
+
+/**
+ * A tester for testing Debezium MySQL source.
+ *
+ * It reads binlog from MySQL, and store the debezium output into Pulsar.
+ * This test verify that the target topic contains wanted number messages.
+ *
+ * Debezium MySQL Container is "debezium/example-mysql:0.8",
+ * which is a MySQL database server preconfigured with an inventory database.
+ */
+@Slf4j
+public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer>
{
+
+    private static final String NAME = "kafka-connect-adaptor";
+
+    private final String pulsarServiceUrl;
+
+    @Getter
+    private DebeziumMySQLContainer debeziumMySqlContainer;
+
+    private final PulsarCluster pulsarCluster;
+
+    public DebeziumMySqlSourceTester(PulsarCluster cluster) {
+        super(NAME);
+        this.pulsarCluster = cluster;
+        pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
+
+        sourceConfig.put("task.class", "io.debezium.connector.mysql.MySqlConnectorTask");
+        sourceConfig.put("database.hostname", "mysql");
+        sourceConfig.put("database.port", "3306");
+        sourceConfig.put("database.user", "debezium");
+        sourceConfig.put("database.password", "dbz");
+        sourceConfig.put("database.server.id", "184054");
+        sourceConfig.put("database.server.name", "dbserver1");
+        sourceConfig.put("database.whitelist", "inventory");
+        sourceConfig.put("database.history", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
+        sourceConfig.put("database.history.pulsar.topic", "history-topic");
+        sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        sourceConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("offset.storage.topic", "offset-topic");
+    }
+
+    @Override
+    public void setServiceContainer(DebeziumMySQLContainer container) {
+        log.info("start debezium mysql server container.");
+        debeziumMySqlContainer = container;
+        pulsarCluster.startService("mysql", debeziumMySqlContainer);
+    }
+
+    @Override
+    public void prepareSource() throws Exception {
+        log.info("debezium mysql server already contains preconfigured data.");
+    }
+
+    @Override
+    public Map<String, String> produceSourceMessages(int numMessages) throws Exception
{
+        log.info("debezium mysql server already contains preconfigured data.");
+        return null;
+    }
+
+    public void validateSourceResult(Consumer<String> consumer, Map<String, String>
kvs) throws Exception {
+        int recordsNumber = 0;
+        Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+        while(msg != null) {
+            recordsNumber ++;
+            log.info("Received message: {}.", msg.getValue());
+            Assert.assertTrue(msg.getValue().contains("dbserver1.inventory.products"));
+            consumer.acknowledge(msg);
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+        }
+
+        Assert.assertEquals(recordsNumber, 9);
+        log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(),
recordsNumber);
+    }
+}


Mime
View raw message