pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [doc] Generate connector yaml files for connectors (#3065)
Date Tue, 27 Nov 2018 18:08:37 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08ef448  [doc] Generate connector yaml files for connectors (#3065)
08ef448 is described below

commit 08ef448c1c7e70db345092c2b56662c7b6f888fe
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Tue Nov 27 10:08:32 2018 -0800

    [doc] Generate connector yaml files for connectors (#3065)
    
    *Motivation*
    
    Include the example yaml files in the io distribution package
---
 distribution/io/pom.xml                            |  1 +
 .../org/apache/pulsar/io/canal/CanalSource.java    |  9 +++-
 .../apache/pulsar/io/canal/CanalSourceConfig.java  | 41 +++++++++++++++-
 .../pulsar/io/cassandra/CassandraSinkConfig.java   | 21 +++++++++
 .../pulsar/io/cassandra/CassandraStringSink.java   |  7 +++
 pulsar-io/docs/pom.xml                             | 45 ++++++++++++++++++
 .../io/elasticsearch/ElasticSearchConfig.java      | 31 +++++++++++++
 .../pulsar/io/elasticsearch/ElasticSearchSink.java |  8 ++++
 .../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java  |  8 ++++
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  | 31 +++++++++++++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |  8 ++++
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 54 +++++++++++++++++++++-
 .../apache/pulsar/io/rabbitmq/RabbitMQConfig.java  | 16 +++++++
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  |  7 +++
 .../apache/pulsar/io/twitter/TwitterFireHose.java  |  8 ++++
 .../pulsar/io/twitter/TwitterFireHoseConfig.java   | 44 +++++++++++++++++-
 16 files changed, 334 insertions(+), 5 deletions(-)

diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index 67e3509..125fe70 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -79,6 +79,7 @@
             </goals>
             <configuration>
               <executable>${project.basedir}/../../src/pulsar-io-gen</executable>
+              <outputFile>${project.basedir}/target/pulsar-io-gen.output</outputFile>
               <arguments>
                 <argument>conf</argument>
                 <argument>-o</argument>
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
index 9021b23..c348964 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
@@ -30,6 +30,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.MDC;
 
 import java.net.InetSocketAddress;
@@ -39,8 +41,13 @@ import java.util.Objects;
 import java.util.Optional;
 
 /**
- * A Simple class for mysql binlog sync to pulsar
+ * A Simple class for mysql binlog sync to pulsar.
  */
+@Connector(
+    name = "canal",
+    type = IOType.SOURCE,
+    help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
+    configClass = CanalSourceConfig.class)
 @Slf4j
 public class CanalSource extends PushSource<byte[]> {
 
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
index 87cbc9f..1af70c1 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
@@ -27,8 +27,12 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 
+/**
+ * Canal source config.
+ */
 @Data
 @Setter
 @Getter
@@ -37,14 +41,47 @@ import java.util.Map;
 @Accessors(chain = true)
 public class CanalSourceConfig implements Serializable{
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Username to connect to mysql database")
     private String username;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Password to connect to mysql database")
     private String password;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Source destination that Canal source connector connects to")
     private String destination;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The mysql database hostname")
     private String singleHostname;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The mysql database port")
     private int singlePort;
-    private Boolean cluster;
+    @FieldDoc(
+        required = true,
+        defaultValue = "false",
+        help = "If setting to true, it will be talking to `zkServers` to figure out the actual
database hosts."
+            + " If setting to false, it will connect to the database specified by `singleHostname`
and `singlePort`.")
+    private Boolean cluster = false;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The zookeeper servers that canal source connector talks to figure out the
actual database hosts")
     private String zkServers;
-    private int batchSize;
+    @FieldDoc(
+        required = false,
+        defaultValue = "1000",
+        help = "The batch size to fetch from canal.")
+    private int batchSize = 1000;
 
     public static CanalSourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
index c3f6587..2406952 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,10 +40,30 @@ public class CassandraSinkConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "A comma-separated list of cassandra hosts to connect to")
     private String roots;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The key space used for writing pulsar messages to")
     private String keyspace;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The key name of the cassandra column family")
     private String keyname;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The cassandra column family name")
     private String columnFamily;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The column name of the cassandra column family")
     private String columnName;
 
     public static CassandraSinkConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 4e7feb5..694b79b 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -21,11 +21,18 @@ package org.apache.pulsar.io.cassandra;
 
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
  * Cassandra sink that treats incoming messages on the input topic as Strings
  * and write identical key/value pairs.
  */
+@Connector(
+    name = "cassandra",
+    type = IOType.SINK,
+    help = "The CassandraStringSink is used for moving messages from Pulsar to Cassandra.",
+    configClass = CassandraSinkConfig.class)
 public class CassandraStringSink extends CassandraAbstractSink<String, String> {
     @Override
     public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 04fffee..04d44c8 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -49,9 +49,54 @@
     <!-- include connectors -->
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-canal</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-cassandra</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-data-generator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-elastic-search</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-hdfs</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-jdbc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-kafka</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-kinesis</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-rabbitmq</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-twitter</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 8b54353..f929f97 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -31,6 +31,7 @@ import lombok.Setter;
 import lombok.ToString;
 import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
  * Configuration class for the ElasticSearch Sink Connector.
@@ -45,16 +46,46 @@ public class ElasticSearchConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The url of elastic search cluster that the connector connects to"
+    )
     private String elasticSearchUrl;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The index name that the connector writes messages to"
+    )
     private String indexName;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "1",
+        help = "The number of shards of the index"
+    )
     private int indexNumberOfShards = 1;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "1",
+        help = "The number of replicas of the index"
+    )
     private int indexNumberOfReplicas = 1;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The username used by the connector to connect to the elastic search cluster.
If username is set, a password should also be provided."
+    )
     private String username;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The password used by the connector to connect to the elastic search cluster.
If password is set, a username should also be provided"
+    )
     private String password;
 
     public static ElasticSearchConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 86546f3..dca1504 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -33,6 +33,8 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -51,6 +53,12 @@ import org.elasticsearch.common.xcontent.XContentType;
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
+@Connector(
+    name = "elastic_search",
+    type = IOType.SINK,
+    help = "A sink connector that sends pulsar messages to elastic search",
+    configClass = ElasticSearchConfig.class
+)
 public class ElasticSearchSink implements Sink<byte[]> {
 
     protected static final String DOCUMENT = "doc";
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index c18dc81..640972a 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -23,11 +23,19 @@ import java.sql.PreparedStatement;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
 /**
  * A Simple Jdbc sink, which interprets input Record in generic record.
  */
+@Connector(
+    name = "jdbc",
+    type = IOType.SINK,
+    help = "A simple JDBC sink that writes pulser messages to a database table",
+    configClass = JdbcSinkConfig.class
+)
 @Slf4j
 public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
 
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 6cc95d6..b0508ba 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,13 +40,43 @@ public class JdbcSinkConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Username used to connect to the database specified by `jdbcUrl`"
+    )
     private String userName;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Password used to connect to the database specified by `jdbcUrl`"
+    )
     private String password;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The JDBC url of the database this connector connects to"
+    )
     private String jdbcUrl;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The name of the table this connector writes messages to"
+    )
     private String tableName;
 
     // Optional
+    @FieldDoc(
+        required = false,
+        defaultValue = "500",
+        help = "The jdbc operation timeout in milliseconds"
+    )
     private int timeoutMs = 500;
+    @FieldDoc(
+        required = false,
+        defaultValue = "200",
+        help = "The batch size of updates made to the database"
+    )
     private int batchSize = 200;
 
     public static JdbcSinkConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 1056f57..5d4c8a0 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -51,6 +51,8 @@ import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +82,12 @@ import org.slf4j.LoggerFactory;
  *
  *
  */
+@Connector(
+    name = "kinesis",
+    type = IOType.SINK,
+    help = "A sink connector that copies messages from Pulsar to Kinesis",
+    configClass = KinesisSinkConfig.class
+)
 public class KinesisSink implements Sink<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KinesisSink.class);
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ba476ab..3345026 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,13 +40,64 @@ public class KinesisSinkConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Kinesis end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html"
+    )
     private String awsEndpoint;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Appropriate aws region. E.g. us-west-1, us-west-2"
+    )
     private String awsRegion;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Kinesis stream name"
+    )
     private String awsKinesisStreamName;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Fully-Qualified class name of implementation of AwsCredentialProviderPlugin."
+            + " It is a factory class which creates an AWSCredentialsProvider that will be
used by Kinesis Sink."
+            + " If it is empty then KinesisSink will create a default AWSCredentialsProvider
which accepts json-map"
+            + " of credentials in `awsCredentialPluginParam`")
     private String awsCredentialPluginName;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
     private String awsCredentialPluginParam;
+    @FieldDoc(
+        required = true,
+        defaultValue = "ONLY_RAW_PAYLOAD",
+        help = "Message format in which kinesis sink converts pulsar messages and publishes
to kinesis streams.\n"
+            + "  #\n"
+            + "  # The available messages formats are: \n"
+            + "  #\n"
+            + "  # - ONLY_RAW_PAYLOAD \n"
+            + "  #\n"
+            + "  #   Kinesis sink directly publishes pulsar message payload as a message
into the configured kinesis stream. \n"
+            + "  #\n"
+            + "  # - FULL_MESSAGE_IN_JSON \n"
+            + "  #\n"
+            + "  #   Kinesis sink creates a json payload with pulsar message payload, properties
and encryptionCtx, \n"
+            + "  #   and publishes json payload into the configured kinesis stream.\n"
+            + "  #\n"
+            + "  # - FULL_MESSAGE_IN_FB \n"
+            + "  #\n"
+            + "  #   Kinesis sink creates a flatbuffer serialized paylaod with pulsar message
payload, \n"
+            + "  #   properties and encryptionCtx, and publishes flatbuffer payload into
the configured kinesis stream."
+    )
     private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD
-    private boolean retainOrdering;
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar
to Kinesis")
+    private boolean retainOrdering = false;
 
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
index 33bc9c1..f4fd61f 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
@@ -28,7 +28,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
+/**
+ * RabbitMQ source connector config.
+ */
 @Data
 @Setter
 @Getter
@@ -39,8 +43,20 @@ public class RabbitMQConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The connection name used for connecting to RabbitMQ")
     private String connectionName;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The AMQ uri used for connecting to RabbitMQ")
     private String amqUri;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The RabbitMQ queue name")
     private String queueName;
 
     public static RabbitMQConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index 2277cbb..6bb9613 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -35,12 +35,19 @@ import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A simple connector to consume messages from a RabbitMQ queue
  */
+@Connector(
+    name = "rabbitmq",
+    type = IOType.SOURCE,
+    help = "A simple connector to move messages from a RabbitMQ queue to a Pulsar topic",
+    configClass = RabbitMQConfig.class)
 public class RabbitMQSource extends PushSource<byte[]> {
 
     private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class);
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index fc61945..e1d0545 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -43,12 +43,20 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Simple Push based Twitter FireHose Source
  */
+@Connector(
+    name = "twitter",
+    type = IOType.SOURCE,
+    help = "A simple connector moving tweets from Twitter FireHose to Pulsar",
+    configClass = TwitterFireHoseConfig.class
+)
 @Slf4j
 public class TwitterFireHose extends PushSource<TweetData> {
 
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index 88acb33..c9bf4df 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import com.twitter.hbc.core.Constants;
 import lombok.*;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -40,18 +41,59 @@ public class TwitterFireHoseConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app consumer key. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String consumerKey;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app consumer secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String consumerSecret;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app token. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String token;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app token secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String tokenSecret;
     // Most firehose events have null createdAt time. If this parameter is set to true
     // then we estimate the createdTime of each firehose event to be current time.
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "Most firehose events have null createdAt time."
+            + " If this parameter is set to true, the connector estimates the createdTime
of each firehose event to be current time."
+    )
     private Boolean guestimateTweetTime = false;
 
     // ------ Optional property keys
 
-    private String clientName = "openconnector-twitter-source";
+    @FieldDoc(
+        required = false,
+        defaultValue = "pulsario-twitter-source",
+        help = "The Twitter Firehose Client name"
+    )
+    private String clientName = "pulsario-twitter-source";
+    @FieldDoc(
+        required = false,
+        defaultValue = Constants.STREAM_HOST,
+        help = "The Twitter Firehose stream hosts that the connector connects to"
+    )
     private String clientHosts = Constants.STREAM_HOST;
+    @FieldDoc(
+        required = false,
+        defaultValue = "50000",
+        help = "The Twitter Firehose client buffer size"
+    )
     private int clientBufferSize = 50000;
 
     public static TwitterFireHoseConfig load(String yamlFile) throws IOException {


Mime
View raw message