pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jerrypeng commented on a change in pull request #2275: [website] improve pulsar connector documentation
Date Wed, 01 Aug 2018 20:39:43 GMT
jerrypeng commented on a change in pull request #2275:  [website] improve pulsar connector
documentation
URL: https://github.com/apache/incubator-pulsar/pull/2275#discussion_r207023661
 
 

 ##########
 File path: site2/docs/io-develop.md
 ##########
 @@ -0,0 +1,195 @@
+---
+id: io-develop
+title: Develop Connectors
+sidebar_label: Developing Connectors
+---
+
+This guide describes how developers can write new connectors for Pulsar IO to move data
+between Pulsar and other systems. It describes how to create a Pulsar IO connector.
+
+Pulsar IO connectors are specialized [Pulsar Functions](functions-overview.md). So writing
+a Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors come
+in two flavors: {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java},
+which import data from another system, and {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java},
+which export data to another system. For example, [KinesisSink](io-kinesis.md) would export
+the messages of a Pulsar topic to a Kinesis stream, and [RabbitmqSource](io-rabbitmq.md)
would import
+the messages of a RabbitMQ queue to a Pulsar topic.
+
+### Developing
+
+#### Develop a source connector
+
+What you need to develop a source connector is to implement {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}
+interface.
+
+First, you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33}
method. This method will be called once when the source connector
+is initialized. In this method, you can retrieve all the connector specific settings through
+the passed `config` parameter, and initialize all the necessary resourcess. For example,
a Kafka
+connector can create the Kafka client in this `open` method.
+
+Beside the passed-in `config` object, the Pulsar runtime also provides a `SourceContext`
for the
+connector to access runtime resources for tasks like collecting metrics. The implementation
can
+save the `SourceContext` for futher usage.
+
+```java
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @param sourceContext
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    void open(final Map<String, Object> config, SourceContext sourceContext) throws
Exception;
+```
+
+The main task for a Source implementor is to implement {@inject: github:`read`:/master/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41}
+method.
+
+```java
+    /**
+     * Reads the next message from source.
+     * If source does not have any new messages, this call should block.
+     * @return next message from source.  The return result should never be null
+     * @throws Exception
+     */
+    Record<T> read() throws Exception;
+```
+
+The implementation should be blocking on this method if nothing to return. It should never
return
+`null`. The returned {@inject: github:`Record`:/master/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28}
should encapsulates the information that is needed by
+Pulsar IO runtime.
+
+These information includes:
+
+- *Topic Name*: _Optional_. If the record is originated from a Pulsar topic, it should be
the Pulsar topic name.
+- *Key*: _Optional_. If the record has a key associated with it.
+- *Value*: _Required_. The actual data of this record.
+- *Partition Id*: _Optional_. If the record is originated from a partitioned source,
+  return its partition id. The partition id will be used as part of the unique identifier
+  by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
+- *Record Sequence*: _Optional_. If the record is originated from a sequential source,
+  return its record sequence. The record sequence will be used as part of the unique identifier
+  by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
+- *Properties*: _Optional_. If the record carries user-defined properties, return those properties.
+
+Additionally, the implemention of the record should provide two methods: `ack` and `fail`.
These
+two methods will be used by Pulsar IO connector to acknowledge the records that it has done
+processing and fail the records that it has failed to process.
+
+{@inject: github:`KafkaSource`:/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java}
is a good example to follow.
+
+#### Develop a sink connector
+
+Developing a sink connector is as easy as developing a source connector. You just need to
+implement {@inject: github:`Sink`:/master/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java}
interface.
+
+Similarly, you first need to implement the {@inject: github:`open`:/master/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36}
method to initialize all the necessary resources
+before implementing the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44}
method.
+
+```java
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @param sinkContext
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
+```
+
+The main task for a Sink implementor is to implement {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44}
method.
+
+```java
+    /**
+     * Write a message to Sink
+     * @param inputRecordContext Context of input record from the source
+     * @param record record to write to sink
+     * @throws Exception
+     */
+    void write(Record<T> record) throws Exception;
+```
+
+In the implemention of `write` method, the implementor can decide how to write the value
and
+the optional key to the actual source, and leverage all the provided information such as
+`Partition Id`, `Record Sequence` for achieving different processing guarantees. The implementor
+is also responsible for acknowledging records if it has successfully written them or failing
+records if has failed to write them.
+
+### Testing
+
+Testing connectors can be challenging because Pulsar IO connectors interact with two systems
+that may be difficult to mock - Pulsar and the system the connector is connecting to. It
is
+recommended to write very specificially test the functionalities of the connector classes
+while mocking the external services.
+
+Once you have written sufficient unit tests for your connector, we also recommend adding
+separate integration tests to verify end-to-end functionality. In Pulsar, we are using
+[testcontainers](https://www.testcontainers.org/) for all Pulsar integration tests. Pulsar
IO
+{@inject: github:`IntegrationTests`:/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io}
are good examples to follow on integration testing your connectors.
+
+### Packaging
+
+Once you've developed and tested your connector, you must package it so that it can be submitted
+to a [Pulsar Functions](functions-overview.md) cluster. There are two approaches described
+here work with Pulsar Functions' runtime.
+
+If you plan to package and distribute your connector for others to use, you are obligated
to
+properly license and copyright your own code and to adhere to the licensing and copyrights
of
+all libraries your code uses and that you include in your distribution. If you are using
the
+approach described in ["Creating a NAR package"](#creating-a-nar-package), the NAR plugin
will
+automatically create a `DEPENDENCIES` file in the generated NAR package, including the proper
+licensing and copyrights of all libraries of your connector.
+
+#### Creating a NAR package
+
+The most easiest approach to packaging a Pulsar IO connector is to create a NAR package using
 
 Review comment:
   delete "most"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message