From commits-return-37572-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Sun Sep 15 00:08:44 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4C2A3180657 for ; Sun, 15 Sep 2019 02:08:44 +0200 (CEST) Received: (qmail 2595 invoked by uid 500); 15 Sep 2019 00:08:43 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 2563 invoked by uid 99); 15 Sep 2019 00:08:43 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Sep 2019 00:08:43 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] [pulsar] tuteng commented on a change in pull request #5181: [Doc] Update *Develop Connectors Guide* Message-ID: <156850612348.3556.5772598634222322145.gitbox@gitbox.apache.org> Date: Sun, 15 Sep 2019 00:08:43 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit tuteng commented on a change in pull request #5181: [Doc] Update *Develop Connectors Guide* URL: https://github.com/apache/pulsar/pull/5181#discussion_r324441631 ########## File path: site2/docs/io-develop.md ########## @@ -1,156 +1,188 @@ --- id: io-develop -title: Develop Connectors -sidebar_label: Developing Connectors +title: How to develop Pulsar connectors +sidebar_label: Develop --- -This guide describes how developers can write new connectors for Pulsar IO to move data -between Pulsar and other systems. It describes how to create a Pulsar IO connector. +This guide describes how to develop Pulsar connectors to move data +between Pulsar and other systems. -Pulsar IO connectors are specialized [Pulsar Functions](functions-overview.md). So writing -a Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors come -in two flavors: {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}, -which import data from another system, and {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java}, -which export data to another system. For example, [KinesisSink](io-kinesis.md) would export -the messages of a Pulsar topic to a Kinesis stream, and [RabbitmqSource](io-rabbitmq.md) would import -the messages of a RabbitMQ queue to a Pulsar topic. +Pulsar connectors are special [Pulsar Functions](functions-overview.md), so creating +a Pulsar connector is similar to creating a Pulsar function. -### Developing +Pulsar connectors come in two types: -#### Develop a source connector +| Type | Description | Example +|---|---|--- +{@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|[RabbitMQ source connector](io-rabbitmq.md) imports the messages of a RabbitMQ queue to a Pulsar topic. +{@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java}|Export data from Pulsar to another system.|[Kinesis sink connector](io-kinesis.md) exports the messages of a Pulsar topic to a Kinesis stream. -What you need to develop a source connector is to implement {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java} -interface. +## Develop -First, you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method. This method will be called once when the source connector -is initialized. In this method, you can retrieve all the connector specific settings through -the passed `config` parameter, and initialize all the necessary resourcess. For example, a Kafka -connector can create the Kafka client in this `open` method. +You can develop Pulsar source connectors and sink connectors. -Beside the passed-in `config` object, the Pulsar runtime also provides a `SourceContext` for the -connector to access runtime resources for tasks like collecting metrics. The implementation can -save the `SourceContext` for futher usage. +### Source -```java +Developing a source connector is to implement the {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java} +interface, which means you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method and the {@inject: github:`record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} method. + +1. Implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method. + + ```java /** - * Open connector with configuration - * - * @param config initialization config - * @param sourceContext - * @throws Exception IO type exceptions when opening a connector - */ + * Open connector with configuration + * + * @param config initialization config + * @param sourceContext + * @throws Exception IO type exceptions when opening a connector + */ void open(final Map config, SourceContext sourceContext) throws Exception; -``` + ``` -The main task for a Source implementor is to implement {@inject: github:`read`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41} -method. + This method is called when the source connector is initialized. -```java - /** - * Reads the next message from source. - * If source does not have any new messages, this call should block. - * @return next message from source. The return result should never be null - * @throws Exception - */ - Record read() throws Exception; -``` + In this method, you can retrieve all connector specific settings through the passed-in `config` parameter and initialize all necessary resources. + + For example, a Kafka connector can create a Kafka client in this `open` method. -The implementation should be blocking on this method if nothing to return. It should never return -`null`. The returned {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should encapsulates the information that is needed by -Pulsar IO runtime. + Besides, Pulsar runtime also provides a `SourceContext` for the + connector to access runtime resources for tasks like collecting metrics. The implementation can save the `SourceContext` for future use. -These information includes: +2. Implement the {@inject: github:`read`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41} method. -- *Topic Name*: _Optional_. If the record is originated from a Pulsar topic, it should be the Pulsar topic name. -- *Key*: _Optional_. If the record has a key associated with it. -- *Value*: _Required_. The actual data of this record. -- *Partition Id*: _Optional_. If the record is originated from a partitioned source, - return its partition id. The partition id will be used as part of the unique identifier - by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee. -- *Record Sequence*: _Optional_. If the record is originated from a sequential source, - return its record sequence. The record sequence will be used as part of the unique identifier - by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee. -- *Properties*: _Optional_. If the record carries user-defined properties, return those properties. + ```java + /** + * Reads the next message from source. + * If source does not have any new messages, this call should block. + * @return next message from source. The return result should never be null + * @throws Exception + */ + Record read() throws Exception; + ``` -Additionally, the implemention of the record should provide two methods: `ack` and `fail`. These -two methods will be used by Pulsar IO connector to acknowledge the records that it has done -processing and fail the records that it has failed to process. + If nothing to return, the implementation should be blocking rather than returning `null`. -{@inject: github:`KafkaSource`:/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java} is a good example to follow. + The returned {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should encapsulate the following information, which is needed by Pulsar IO runtime. -#### Develop a sink connector + * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should provide the following variables: -Developing a sink connector is as easy as developing a source connector. You just need to -implement {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java} interface. + |Variable|Required|Description + |---|---|--- + `TopicName`|No|Pulsar topic name from which the record is originated from. + `Key`|No| | + `Value`|Yes|Actual data of the record. + `EventTime`|No|Event time of the record from the source. + `PartitionId`|No| If the record is originated from a partitioned source, it returns its `PartitionId`.

`PartitionId` is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee. + `RecordSequence`|No|If the record is originated from a sequential source, it returns its `RecordSequence`.

`RecordSequence` is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee. + `Properties` |Yes| If the record carries user-defined properties, it returns those properties. Review comment: `Properties ` this option also seems to be optional. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services