From commits-return-14979-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Mon Sep 24 08:16:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6E522180675 for ; Mon, 24 Sep 2018 08:16:06 +0200 (CEST) Received: (qmail 40514 invoked by uid 500); 24 Sep 2018 06:16:05 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 40505 invoked by uid 99); 24 Sep 2018 06:16:05 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Sep 2018 06:16:05 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] sijie closed pull request #2614: Debezium: add PulsarDatabaseHistory for debezium Message-ID: <153776976466.11717.15327708725798184946.gitbox@gitbox.apache.org> Date: Mon, 24 Sep 2018 06:16:04 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit sijie closed pull request #2614: Debezium: add PulsarDatabaseHistory for debezium URL: https://github.com/apache/pulsar/pull/2614 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index a8d68a480c..83b83fee33 100644 --- a/pom.xml +++ b/pom.xml @@ -177,6 +177,7 @@ flexible messaging model and an intuitive client API. 0.206 1.6.0 2.11 + 0.8.2 1.15.1 @@ -1348,7 +1349,7 @@ flexible messaging model and an intuitive client API. docker - + + + 4.0.0 + + org.apache.pulsar + pulsar-io + 2.2.0-SNAPSHOT + + + pulsar-io-debezium + Pulsar IO :: Debezium + + + + + ${project.groupId} + pulsar-io-core + ${project.version} + + + + io.debezium + debezium-core + ${debezium-core.version} + + + + org.apache.kafka + kafka_${scala.binary.version} + ${kafka-client.version} + + + + ${project.groupId} + pulsar-client-original + ${project.version} + + + + ${project.groupId} + pulsar-broker + ${project.version} + test + + + + ${project.groupId} + managed-ledger-original + ${project.version} + test-jar + test + + + + ${project.groupId} + pulsar-zookeeper-utils + ${project.version} + test-jar + test + + + + ${project.groupId} + pulsar-broker + ${project.version} + test + test-jar + + + + + + + + org.apache.nifi + nifi-nar-maven-plugin + + + + + + diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java new file mode 100644 index 0000000000..bc97fc6353 --- /dev/null +++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium; + +import static org.apache.commons.lang.StringUtils.isBlank; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.relational.history.AbstractDatabaseHistory; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import java.io.IOException; +import java.util.UUID; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; + +/** + * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified topic, + * and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. + */ +@Slf4j +@ThreadSafe +public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { + + public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic") + .withDisplayName("Database history topic name") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("The name of the topic for the database schema history") + .withValidation(Field::isRequired); + + public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url") + .withDisplayName("Kafka broker addresses") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar service url") + .withValidation(Field::isRequired); + + public static Field.Set ALL_FIELDS = Field.setOf( + TOPIC, + SERVICE_URL, + DatabaseHistory.NAME); + + private final DocumentReader reader = DocumentReader.defaultReader(); + private String topicName; + private String serviceUrl; + private String dbHistoryName; + private volatile PulsarClient pulsarClient; + private volatile Producer producer; + + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator) { + super.configure(config, comparator); + if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { + throw new IllegalArgumentException("Error configuring an instance of " + + getClass().getSimpleName() + "; check the logs for details"); + } + this.topicName = config.getString(TOPIC); + this.serviceUrl = config.getString(SERVICE_URL); + // Copy the relevant portions of the configuration and add useful defaults ... + this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString()); + + log.info("Configure to store the debezium database history {} to pulsar topic {} at {}", + dbHistoryName, topicName, serviceUrl); + } + + @Override + public void initializeStorage() { + super.initializeStorage(); + + // try simple to publish an empty string to create topic + try (Producer p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { + p.send(""); + } catch (PulsarClientException pce) { + log.error("Failed to initialize storage", pce); + throw new RuntimeException("Failed to initialize storage", pce); + } + } + + void setupClientIfNeeded() { + if (null == this.pulsarClient) { + try { + pulsarClient = PulsarClient.builder() + .serviceUrl(serviceUrl) + .build(); + } catch (PulsarClientException e) { + throw new RuntimeException("Failed to create pulsar client to pulsar cluster at " + + serviceUrl, e); + } + } + } + + void setupProducerIfNeeded() { + setupClientIfNeeded(); + if (null == this.producer) { + try { + this.producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName(dbHistoryName) + .blockIfQueueFull(true) + .create(); + } catch (PulsarClientException e) { + log.error("Failed to create pulsar producer to topic '{}' at cluster '{}'", topicName, serviceUrl); + throw new RuntimeException("Failed to create pulsar producer to topic '" + + topicName + "' at cluster '" + serviceUrl + "'", e); + } + } + } + + @Override + public void start() { + super.start(); + setupProducerIfNeeded(); + } + + @Override + protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + if (this.producer == null) { + throw new IllegalStateException("No producer is available. Ensure that 'start()'" + + " is called before storing database history records."); + } + if (log.isTraceEnabled()) { + log.trace("Storing record into database history: {}", record); + } + try { + producer.send(record.toString()); + } catch (PulsarClientException e) { + throw new DatabaseHistoryException(e); + } + } + + @Override + public void stop() { + try { + if (this.producer != null) { + try { + producer.flush(); + } catch (PulsarClientException pce) { + // ignore the error to ensure the client is eventually closed + } finally { + this.producer.close(); + } + this.producer = null; + } + if (this.pulsarClient != null) { + pulsarClient.close(); + this.pulsarClient = null; + } + } catch (PulsarClientException pe) { + log.warn("Failed to closing pulsar client", pe); + } + } + + @Override + protected void recoverRecords(Consumer records) { + setupClientIfNeeded(); + try (Reader historyReader = pulsarClient.newReader(Schema.STRING) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create() + ) { + log.info("Scanning the database history topic '{}'", topicName); + + // Read all messages in the topic ... + MessageId lastProcessedMessageId = null; + + // read the topic until the end + while (historyReader.hasMessageAvailable()) { + Message msg = historyReader.readNext(); + try { + if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) { + if (!isBlank(msg.getValue())) { + HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue())); + if (log.isTraceEnabled()) { + log.trace("Recovering database history: {}", recordObj); + } + if (recordObj == null || !recordObj.isValid()) { + log.warn("Skipping invalid database history record '{}'. " + + "This is often not an issue, but if it happens repeatedly please check the '{}' topic.", + recordObj, topicName); + } else { + records.accept(recordObj); + log.trace("Recovered database history: {}", recordObj); + } + } + lastProcessedMessageId = msg.getMessageId(); + } + } catch (IOException ioe) { + log.error("Error while deserializing history record '{}'", msg.getValue(), ioe); + } catch (final Exception e) { + throw e; + } + } + log.info("Successfully completed scanning the database history topic '{}'", topicName); + } catch (IOException ioe) { + log.error("Encountered issues on recovering history records", ioe); + throw new RuntimeException("Encountered issues on recovering history records", ioe); + } + } + + @Override + public boolean exists() { + setupClientIfNeeded(); + try (Reader historyReader = pulsarClient.newReader(Schema.STRING) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create() + ) { + return historyReader.hasMessageAvailable(); + } catch (IOException e) { + log.error("Encountered issues on checking existence of database history", e); + throw new RuntimeException("Encountered issues on checking existence of database history", e); + } + } + + @Override + public String toString() { + if (topicName != null) { + return "Pulsar topic (" + topicName + ") at " + serviceUrl; + } + return "Pulsar topic"; + } +} diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java new file mode 100644 index 0000000000..e3b4fd94e7 --- /dev/null +++ b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium; + +import static org.junit.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import io.debezium.config.Configuration; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParserSql2003; +import io.debezium.relational.ddl.LegacyDdlParser; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.text.ParsingException; +import io.debezium.util.Collect; +import java.util.Map; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Test the implementation of {@link PulsarDatabaseHistory}. + */ +public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { + + private PulsarDatabaseHistory history; + private Map position; + private Map source; + private String topicName; + private String ddl; + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + source = Collect.hashMapOf("server", "my-server"); + setLogPosition(0); + this.topicName = "persistent://my-property/my-ns/schema-changes-topic"; + this.history = new PulsarDatabaseHistory(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + private void testHistoryTopicContent(boolean skipUnparseableDDL) { + // Start up the history ... + Configuration config = Configuration.create() + .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) + .with(PulsarDatabaseHistory.TOPIC, topicName) + .with(DatabaseHistory.NAME, "my-db-history") + .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL) + .build(); + history.configure(config, null); + history.start(); + + // Should be able to call start more than once ... + history.start(); + + history.initializeStorage(); + + // Calling it another time to ensure we can work with the DB history topic already existing + history.initializeStorage(); + + LegacyDdlParser recoveryParser = new DdlParserSql2003(); + LegacyDdlParser ddlParser = new DdlParserSql2003(); + ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well + Tables tables1 = new Tables(); + Tables tables2 = new Tables(); + Tables tables3 = new Tables(); + + // Recover from the very beginning ... + setLogPosition(0); + history.recover(source, position, tables1, recoveryParser); + + // There should have been nothing to recover ... + assertEquals(tables1.size(), 0); + + // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ... + setLogPosition(10); + ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" + + "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" + + "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n"; + history.record(source, position, "db1", ddl); + + // Parse the DDL statement 3x and each time update a different Tables object ... + ddlParser.parse(ddl, tables1); + assertEquals(3, tables1.size()); + ddlParser.parse(ddl, tables2); + assertEquals(3, tables2.size()); + ddlParser.parse(ddl, tables3); + assertEquals(3, tables3.size()); + + // Record a drop statement and parse it for 2 of our 3 Tables... + setLogPosition(39); + ddl = "DROP TABLE foo;"; + history.record(source, position, "db1", ddl); + ddlParser.parse(ddl, tables2); + assertEquals(2, tables2.size()); + ddlParser.parse(ddl, tables3); + assertEquals(2, tables3.size()); + + // Record another DDL statement and parse it for 1 of our 3 Tables... + setLogPosition(10003); + ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);"; + history.record(source, position, "db1", ddl); + ddlParser.parse(ddl, tables3); + assertEquals(3, tables3.size()); + + // Stop the history (which should stop the producer) ... + history.stop(); + history = new PulsarDatabaseHistory(); + history.configure(config, null); + // no need to start + + // Recover from the very beginning to just past the first change ... + Tables recoveredTables = new Tables(); + setLogPosition(15); + history.recover(source, position, recoveredTables, recoveryParser); + assertEquals(recoveredTables, tables1); + + // Recover from the very beginning to just past the second change ... + recoveredTables = new Tables(); + setLogPosition(50); + history.recover(source, position, recoveredTables, recoveryParser); + assertEquals(recoveredTables, tables2); + + // Recover from the very beginning to just past the third change ... + recoveredTables = new Tables(); + setLogPosition(10010); + history.recover(source, position, recoveredTables, recoveryParser); + assertEquals(recoveredTables, tables3); + + // Recover from the very beginning to way past the third change ... + recoveredTables = new Tables(); + setLogPosition(100000010); + history.recover(source, position, recoveredTables, recoveryParser); + assertEquals(recoveredTables, tables3); + } + + protected void setLogPosition(int index) { + this.position = Collect.hashMapOf("filename", "my-txn-file.log", + "position", index); + } + + @Test + public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception { + // Create the empty topic ... + testHistoryTopicContent(false); + } + + @Test + public void shouldIgnoreUnparseableMessages() throws Exception { + try (final Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create() + ) { + producer.send(""); + producer.send("{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}"); + producer.send("{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}"); + producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\""); + producer.send("\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}"); + producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}"); + } + + testHistoryTopicContent(true); + } + + @Test(expectedExceptions = ParsingException.class) + public void shouldStopOnUnparseableSQL() throws Exception { + try (final Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { + producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}"); + } + + testHistoryTopicContent(false); + } + + + @Test + public void testExists() { + // happy path + testHistoryTopicContent(true); + assertTrue(history.exists()); + + // Set history to use dummy topic + Configuration config = Configuration.create() + .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) + .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic") + .with(DatabaseHistory.NAME, "my-db-history") + .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) + .build(); + + history.configure(config, null); + history.start(); + + // dummytopic should not exist yet + assertFalse(history.exists()); + } +} diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 10c67c9997..5956d62145 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -43,6 +43,7 @@ jdbc data-genenator elastic-search + debezium ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services