pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #3065: [doc] Generate connector yaml files for connectors
Date Tue, 27 Nov 2018 18:08:34 GMT
merlimat closed pull request #3065: [doc] Generate connector yaml files for connectors
URL: https://github.com/apache/pulsar/pull/3065
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index 67e35096f5..125fe7037d 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -79,6 +79,7 @@
             </goals>
             <configuration>
               <executable>${project.basedir}/../../src/pulsar-io-gen</executable>
+              <outputFile>${project.basedir}/target/pulsar-io-gen.output</outputFile>
               <arguments>
                 <argument>conf</argument>
                 <argument>-o</argument>
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
index 9021b23a64..c348964ded 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
@@ -30,6 +30,8 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.MDC;
 
 import java.net.InetSocketAddress;
@@ -39,8 +41,13 @@
 import java.util.Optional;
 
 /**
- * A Simple class for mysql binlog sync to pulsar
+ * A Simple class for mysql binlog sync to pulsar.
  */
+@Connector(
+    name = "canal",
+    type = IOType.SOURCE,
+    help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
+    configClass = CanalSourceConfig.class)
 @Slf4j
 public class CanalSource extends PushSource<byte[]> {
 
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
index 87cbc9f4c6..1af70c1280 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
@@ -27,8 +27,12 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 
+/**
+ * Canal source config.
+ */
 @Data
 @Setter
 @Getter
@@ -37,14 +41,47 @@
 @Accessors(chain = true)
 public class CanalSourceConfig implements Serializable{
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Username to connect to mysql database")
     private String username;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Password to connect to mysql database")
     private String password;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Source destination that Canal source connector connects to")
     private String destination;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The mysql database hostname")
     private String singleHostname;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The mysql database port")
     private int singlePort;
-    private Boolean cluster;
+    @FieldDoc(
+        required = true,
+        defaultValue = "false",
+        help = "If setting to true, it will be talking to `zkServers` to figure out the actual
database hosts."
+            + " If setting to false, it will connect to the database specified by `singleHostname`
and `singlePort`.")
+    private Boolean cluster = false;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The zookeeper servers that canal source connector talks to figure out the
actual database hosts")
     private String zkServers;
-    private int batchSize;
+    @FieldDoc(
+        required = false,
+        defaultValue = "1000",
+        help = "The batch size to fetch from canal.")
+    private int batchSize = 1000;
 
     public static CanalSourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
index c3f6587e06..24069524b6 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,10 +40,30 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "A comma-separated list of cassandra hosts to connect to")
     private String roots;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The key space used for writing pulsar messages to")
     private String keyspace;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The key name of the cassandra column family")
     private String keyname;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The cassandra column family name")
     private String columnFamily;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The column name of the cassandra column family")
     private String columnName;
 
     public static CassandraSinkConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 4e7feb5b6e..694b79b055 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -21,11 +21,18 @@
 
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
  * Cassandra sink that treats incoming messages on the input topic as Strings
  * and write identical key/value pairs.
  */
+@Connector(
+    name = "cassandra",
+    type = IOType.SINK,
+    help = "The CassandraStringSink is used for moving messages from Pulsar to Cassandra.",
+    configClass = CassandraSinkConfig.class)
 public class CassandraStringSink extends CassandraAbstractSink<String, String> {
     @Override
     public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 04fffeee58..04d44c899e 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -47,11 +47,56 @@
     </dependency>
 
     <!-- include connectors -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-canal</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-cassandra</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-data-generator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-elastic-search</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-hdfs</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-jdbc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-kafka</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-kinesis</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-rabbitmq</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-twitter</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 8b54353c86..f929f9727e 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -31,6 +31,7 @@
 import lombok.ToString;
 import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
  * Configuration class for the ElasticSearch Sink Connector.
@@ -45,16 +46,46 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The url of elastic search cluster that the connector connects to"
+    )
     private String elasticSearchUrl;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The index name that the connector writes messages to"
+    )
     private String indexName;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "1",
+        help = "The number of shards of the index"
+    )
     private int indexNumberOfShards = 1;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "1",
+        help = "The number of replicas of the index"
+    )
     private int indexNumberOfReplicas = 1;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The username used by the connector to connect to the elastic search cluster.
If username is set, a password should also be provided."
+    )
     private String username;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The password used by the connector to connect to the elastic search cluster.
If password is set, a username should also be provided"
+    )
     private String password;
 
     public static ElasticSearchConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 86546f3c96..dca1504d82 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -33,6 +33,8 @@
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -51,6 +53,12 @@
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
+@Connector(
+    name = "elastic_search",
+    type = IOType.SINK,
+    help = "A sink connector that sends pulsar messages to elastic search",
+    configClass = ElasticSearchConfig.class
+)
 public class ElasticSearchSink implements Sink<byte[]> {
 
     protected static final String DOCUMENT = "doc";
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index c18dc81b82..640972a77b 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -23,11 +23,19 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
 /**
  * A Simple Jdbc sink, which interprets input Record in generic record.
  */
+@Connector(
+    name = "jdbc",
+    type = IOType.SINK,
+    help = "A simple JDBC sink that writes pulser messages to a database table",
+    configClass = JdbcSinkConfig.class
+)
 @Slf4j
 public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
 
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 6cc95d6bac..b0508ba50f 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,13 +40,43 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Username used to connect to the database specified by `jdbcUrl`"
+    )
     private String userName;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Password used to connect to the database specified by `jdbcUrl`"
+    )
     private String password;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The JDBC url of the database this connector connects to"
+    )
     private String jdbcUrl;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The name of the table this connector writes messages to"
+    )
     private String tableName;
 
     // Optional
+    @FieldDoc(
+        required = false,
+        defaultValue = "500",
+        help = "The jdbc operation timeout in milliseconds"
+    )
     private int timeoutMs = 500;
+    @FieldDoc(
+        required = false,
+        defaultValue = "200",
+        help = "The batch size of updates made to the database"
+    )
     private int batchSize = 200;
 
     public static JdbcSinkConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 1056f576b2..5d4c8a0a3c 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -51,6 +51,8 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +82,12 @@
  *
  *
  */
+@Connector(
+    name = "kinesis",
+    type = IOType.SINK,
+    help = "A sink connector that copies messages from Pulsar to Kinesis",
+    configClass = KinesisSinkConfig.class
+)
 public class KinesisSink implements Sink<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KinesisSink.class);
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ba476ab099..3345026390 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,13 +40,64 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Kinesis end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html"
+    )
     private String awsEndpoint;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Appropriate aws region. E.g. us-west-1, us-west-2"
+    )
     private String awsRegion;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Kinesis stream name"
+    )
     private String awsKinesisStreamName;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Fully-Qualified class name of implementation of AwsCredentialProviderPlugin."
+            + " It is a factory class which creates an AWSCredentialsProvider that will be
used by Kinesis Sink."
+            + " If it is empty then KinesisSink will create a default AWSCredentialsProvider
which accepts json-map"
+            + " of credentials in `awsCredentialPluginParam`")
     private String awsCredentialPluginName;
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
     private String awsCredentialPluginParam;
+    @FieldDoc(
+        required = true,
+        defaultValue = "ONLY_RAW_PAYLOAD",
+        help = "Message format in which kinesis sink converts pulsar messages and publishes
to kinesis streams.\n"
+            + "  #\n"
+            + "  # The available messages formats are: \n"
+            + "  #\n"
+            + "  # - ONLY_RAW_PAYLOAD \n"
+            + "  #\n"
+            + "  #   Kinesis sink directly publishes pulsar message payload as a message
into the configured kinesis stream. \n"
+            + "  #\n"
+            + "  # - FULL_MESSAGE_IN_JSON \n"
+            + "  #\n"
+            + "  #   Kinesis sink creates a json payload with pulsar message payload, properties
and encryptionCtx, \n"
+            + "  #   and publishes json payload into the configured kinesis stream.\n"
+            + "  #\n"
+            + "  # - FULL_MESSAGE_IN_FB \n"
+            + "  #\n"
+            + "  #   Kinesis sink creates a flatbuffer serialized paylaod with pulsar message
payload, \n"
+            + "  #   properties and encryptionCtx, and publishes flatbuffer payload into
the configured kinesis stream."
+    )
     private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD
-    private boolean retainOrdering;
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar
to Kinesis")
+    private boolean retainOrdering = false;
 
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
index 33bc9c12cf..f4fd61f141 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
@@ -28,7 +28,11 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
+/**
+ * RabbitMQ source connector config.
+ */
 @Data
 @Setter
 @Getter
@@ -39,8 +43,20 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The connection name used for connecting to RabbitMQ")
     private String connectionName;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The AMQ uri used for connecting to RabbitMQ")
     private String amqUri;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The RabbitMQ queue name")
     private String queueName;
 
     public static RabbitMQConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index 2277cbb5f6..6bb96135c5 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -35,12 +35,19 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A simple connector to consume messages from a RabbitMQ queue
  */
+@Connector(
+    name = "rabbitmq",
+    type = IOType.SOURCE,
+    help = "A simple connector to move messages from a RabbitMQ queue to a Pulsar topic",
+    configClass = RabbitMQConfig.class)
 public class RabbitMQSource extends PushSource<byte[]> {
 
     private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class);
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index fc61945449..e1d0545d93 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -43,12 +43,20 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Simple Push based Twitter FireHose Source
  */
+@Connector(
+    name = "twitter",
+    type = IOType.SOURCE,
+    help = "A simple connector moving tweets from Twitter FireHose to Pulsar",
+    configClass = TwitterFireHoseConfig.class
+)
 @Slf4j
 public class TwitterFireHose extends PushSource<TweetData> {
 
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index 88acb3300d..c9bf4dfd44 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -29,6 +29,7 @@
 import com.twitter.hbc.core.Constants;
 import lombok.*;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -40,18 +41,59 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app consumer key. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String consumerKey;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app consumer secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String consumerSecret;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app token. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String token;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Your twitter app token secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+    )
     private String tokenSecret;
     // Most firehose events have null createdAt time. If this parameter is set to true
     // then we estimate the createdTime of each firehose event to be current time.
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "Most firehose events have null createdAt time."
+            + " If this parameter is set to true, the connector estimates the createdTime
of each firehose event to be current time."
+    )
     private Boolean guestimateTweetTime = false;
 
     // ------ Optional property keys
 
-    private String clientName = "openconnector-twitter-source";
+    @FieldDoc(
+        required = false,
+        defaultValue = "pulsario-twitter-source",
+        help = "The Twitter Firehose Client name"
+    )
+    private String clientName = "pulsario-twitter-source";
+    @FieldDoc(
+        required = false,
+        defaultValue = Constants.STREAM_HOST,
+        help = "The Twitter Firehose stream hosts that the connector connects to"
+    )
     private String clientHosts = Constants.STREAM_HOST;
+    @FieldDoc(
+        required = false,
+        defaultValue = "50000",
+        help = "The Twitter Firehose client buffer size"
+    )
     private int clientBufferSize = 50000;
 
     public static TwitterFireHoseConfig load(String yamlFile) throws IOException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message