pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [io][docs] introduce annotations for generating connector yaml config files (#2936)
Date Sun, 25 Nov 2018 19:41:30 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c8f081  [io][docs] introduce annotations for generating connector yaml config files
(#2936)
2c8f081 is described below

commit 2c8f0810a3c55e8db884f1d04b533f606c9e8409
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Sun Nov 25 11:41:26 2018 -0800

    [io][docs] introduce annotations for generating connector yaml config files (#2936)
    
    
    *Motivation*
    
    Currently all io connectors lack example yaml files. Manually write those files is error-prone.
    We need a programmable way that automatically generate example connector yaml files.
    
    *Changes*
    
    - introduce annotations for documenting connector yaml files.
    - provide a generator to generate yaml files
    - provide a shell script to run generator
    - when building io package, generate yaml configs
---
 distribution/io/pom.xml                            |  45 ++++++
 distribution/io/src/assemble/io.xml                |   6 +
 pom.xml                                            |   8 +
 .../pulsar/io/core/annotations/Connector.java      |  61 ++++++++
 .../pulsar/io/core/annotations/FieldDoc.java       |  54 +++++++
 .../apache/pulsar/io/core/annotations/IOType.java  |  36 +++++
 {distribution/io => pulsar-io/docs}/pom.xml        |  53 +++----
 .../pulsar/io/docs/ConnectorDocGenerator.java      | 173 +++++++++++++++++++++
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |  24 +--
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  29 ++--
 .../org/apache/pulsar/io/kafka/KafkaBytesSink.java |   8 +
 .../apache/pulsar/io/kafka/KafkaBytesSource.java   |   8 +
 .../apache/pulsar/io/kafka/KafkaSinkConfig.java    |  41 ++++-
 .../apache/pulsar/io/kafka/KafkaSourceConfig.java  |  51 +++++-
 pulsar-io/pom.xml                                  |   1 +
 src/pulsar-io-gen                                  | 138 ++++++++++++++++
 16 files changed, 676 insertions(+), 60 deletions(-)

diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index 0941913..67e3509 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -39,11 +39,56 @@
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-docs</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
     <plugins>
       <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-conf-dir</id>
+            <phase>compile</phase>
+            <configuration>
+              <tasks>
+                <mkdir dir="target/conf"/>
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>${exec-maven-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>io-conf-gen</id>
+            <phase>package</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>${project.basedir}/../../src/pulsar-io-gen</executable>
+              <arguments>
+                <argument>conf</argument>
+                <argument>-o</argument>
+                <argument>${project.basedir}/target/conf</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
         <executions>
           <execution>
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 1a190a4..a0704c3 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -27,6 +27,12 @@
     <format>tar.gz</format>
   </formats>
   <includeBaseDirectory>true</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/../../distribution/io/target/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+    </fileSet>
+  </fileSets>
   <files>
     <file>
       <source>${basedir}/../../LICENSE</source>
diff --git a/pom.xml b/pom.xml
index da02e5b..a2b43c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,7 @@ flexible messaging model and an intuitive client API.</description>
     <log4j2.version>2.10.0</log4j2.version>
     <bouncycastle.version>1.55</bouncycastle.version>
     <jackson.version>2.9.7</jackson.version>
+    <reflections.version>0.9.11</reflections.version>
     <swagger.version>1.5.21</swagger.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <dockerfile-maven.version>1.4.9</dockerfile-maven.version>
@@ -305,6 +306,13 @@ flexible messaging model and an intuitive client API.</description>
         </exclusions>
       </dependency>
 
+      <!-- reflection libs -->
+      <dependency>
+        <groupId>org.reflections</groupId>
+        <artifactId>reflections</artifactId>
+        <version>${reflections.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.bookkeeper</groupId>
         <artifactId>stream-storage-java-client</artifactId>
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/Connector.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/Connector.java
new file mode 100644
index 0000000..7516573
--- /dev/null
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/Connector.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for documenting connectors.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface Connector {
+
+    /**
+     * Name of the connector.
+     *
+     * @return name of the connector.
+     */
+    String name();
+
+    /**
+     * Type of the connector.
+     *
+     * @return type of the connector.
+     */
+    IOType type();
+
+    /**
+     * Description of this connector.
+     *
+     * @return description of this connector.
+     */
+    String help();
+
+    /**
+     * Config class used by this connector.
+     *
+     * @return config class used by this connector.
+     */
+    Class configClass();
+
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java
new file mode 100644
index 0000000..465da13
--- /dev/null
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for documenting fields in a config.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface FieldDoc {
+
+    /**
+     * Return if the field is required or not.
+     *
+     * @return true if the field is required, otherwise false
+     */
+    boolean required() default false;
+
+    /**
+     * Return the value of this field.
+     *
+     * @return the default value of this field
+     */
+    String defaultValue();
+
+    /**
+     * Return the description of this field.
+     *
+     * @return the help message of this field
+     */
+    String help();
+
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/IOType.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/IOType.java
new file mode 100644
index 0000000..3e900cf
--- /dev/null
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/IOType.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core.annotations;
+
+/**
+ * Type of the io connector.
+ */
+public enum IOType {
+
+    /**
+     * Sink connector.
+     */
+    SINK,
+
+    /**
+     * Source connector.
+     */
+    SOURCE
+
+}
diff --git a/distribution/io/pom.xml b/pulsar-io/docs/pom.xml
similarity index 56%
copy from distribution/io/pom.xml
copy to pulsar-io/docs/pom.xml
index 0941913..04fffee 100644
--- a/distribution/io/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -19,50 +19,39 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>distribution</artifactId>
+    <artifactId>pulsar-io</artifactId>
     <version>2.3.0-SNAPSHOT</version>
-    <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pulsar-io-distribution</artifactId>
-  <packaging>pom</packaging>
-  <name>Pulsar :: Distribution :: IO</name>
+  <artifactId>pulsar-io-docs</artifactId>
+  <name>Pulsar IO :: Docs</name>
 
   <dependencies>
+
     <dependency>
-      <groupId>org.apache.pulsar</groupId>
+      <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.reflections</groupId>
+      <artifactId>reflections</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+
+    <!-- include connectors -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-kafka</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>distro-assembly</id>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-            <configuration>
-              <attach>true</attach>
-              <tarLongFileMode>posix</tarLongFileMode>
-              <finalName>apache-pulsar-io-connectors-${project.version}</finalName>
-              <descriptors>
-                <descriptor>src/assemble/io.xml</descriptor>
-              </descriptors>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
 </project>
diff --git a/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
b/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
new file mode 100644
index 0000000..167e1db
--- /dev/null
+++ b/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.docs;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Strings;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
+
+@Slf4j
+public class ConnectorDocGenerator {
+
+    private static final String INDENT = "  ";
+
+    private static Reflections newReflections() throws Exception {
+        List<URL> urls = new ArrayList<>();
+        ClassLoader[] classLoaders = new ClassLoader[] {
+            ConnectorDocGenerator.class.getClassLoader(),
+            Thread.currentThread().getContextClassLoader()
+        };
+        for (int i = 0; i < classLoaders.length; i++) {
+            if (classLoaders[i] instanceof URLClassLoader) {
+                urls.addAll(Arrays.asList(((URLClassLoader) classLoaders[i]).getURLs()));
+            } else {
+                throw new RuntimeException("ClassLoader '" + classLoaders[i] + " is not an
instance of URLClassLoader");
+            }
+        }
+        ConfigurationBuilder confBuilder = new ConfigurationBuilder();
+        confBuilder.setUrls(urls);
+        return new Reflections(confBuilder);
+    }
+
+    private final Reflections reflections;
+
+    public ConnectorDocGenerator() throws Exception {
+        this.reflections = newReflections();
+    }
+
+    private void generateConnectorYaml(Class configClass, PrintWriter writer) {
+        log.info("Processing connector config class : {}", configClass);
+
+        writer.println("configs:");
+
+        Field[] fields = configClass.getDeclaredFields();
+        for (Field field : fields) {
+            if (Modifier.isStatic(field.getModifiers())) {
+                continue;
+            }
+            FieldDoc fieldDoc = field.getDeclaredAnnotation(FieldDoc.class);
+            if (null == fieldDoc) {
+                throw new RuntimeException("Missing `FieldDoc` for field '" + field.getName()
+ "'");
+            }
+            writer.println(INDENT + "# " + fieldDoc.help());
+            String fieldPrefix = "";
+            if (!fieldDoc.required()) {
+                fieldPrefix = "# ";
+            }
+            if (Strings.isNullOrEmpty(fieldDoc.defaultValue())) {
+                writer.println(INDENT + fieldPrefix + field.getName() + ":");
+            } else {
+                writer.println(INDENT + fieldPrefix + field.getName() + ": " + fieldDoc.defaultValue());
+            }
+            writer.println();
+        }
+        writer.flush();
+    }
+
+    private void generateConnectorYaml(Class connectorClass, Connector connectorDef, PrintWriter
writer) {
+        log.info("Processing connector definition : {}", connectorDef);
+        writer.println("# " + connectorDef.type() + " connector : " + connectorClass.getName());
+        writer.println();
+        writer.println("# " + connectorDef.help());
+        writer.println();
+        generateConnectorYaml(connectorDef.configClass(), writer);
+    }
+
+    private void generatorConnectorYamls(String outputDir) throws IOException  {
+        Set<Class<?>> connectorClasses = reflections.getTypesAnnotatedWith(Connector.class);
+        log.info("Retrieve all `Connector` annotated classes : {}", connectorClasses);
+
+        for (Class<?> connectorClass : connectorClasses) {
+            Connector connectorDef = connectorClass.getDeclaredAnnotation(Connector.class);
+            try (FileWriter fileWriter = new FileWriter(
+                Paths.get(
+                    outputDir,
+                    "pulsar-io-" + connectorDef.name()
+                        + "-" + connectorDef.type().name().toLowerCase()).toString() + ".yml"))
{
+                PrintWriter pw = new PrintWriter(fileWriter);
+                generateConnectorYaml(connectorClass, connectorDef, pw);
+                pw.flush();
+            }
+        }
+    }
+
+    /**
+     * Args for stats generator.
+     */
+    private static class MainArgs {
+
+        @Parameter(
+            names = {
+                "-o", "--output-dir"
+            },
+            description = "The output dir to dump connector docs",
+            required = true
+        )
+        String outputDir = null;
+
+        @Parameter(
+            names = {
+                "-h", "--help"
+            },
+            description = "Show this help message")
+        boolean help = false;
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        MainArgs mainArgs = new MainArgs();
+
+        JCommander commander = new JCommander();
+        try {
+            commander.setProgramName("connector-doc-gen");
+            commander.addObject(mainArgs);
+            commander.parse(args);
+            if (mainArgs.help) {
+                commander.usage();
+                Runtime.getRuntime().exit(0);
+                return;
+            }
+        } catch (Exception e) {
+            commander.usage();
+            Runtime.getRuntime().exit(-1);
+            return;
+        }
+
+        ConnectorDocGenerator docGen = new ConnectorDocGenerator();
+        docGen.generatorConnectorYamls(mainArgs.outputDir);
+    }
+
+}
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 50ce4b9..ac0cce5 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
@@ -77,25 +78,28 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]>
{
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception
{
         kafkaSinkConfig = KafkaSinkConfig.load(config);
-        if (kafkaSinkConfig.getTopic() == null
-                || kafkaSinkConfig.getBootstrapServers() == null
-                || kafkaSinkConfig.getAcks() == null
-                || kafkaSinkConfig.getBatchSize() == 0
-                || kafkaSinkConfig.getMaxRequestSize() == 0) {
-            throw new IllegalArgumentException("Required property not set.");
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set");
+        Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), "Kafka bootstrapServers
is not set");
+        Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode is not set");
+        if (kafkaSinkConfig.getBatchSize() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Producer batchSize : "
+                + kafkaSinkConfig.getBatchSize());
+        }
+        if (kafkaSinkConfig.getMaxRequestSize() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Producer maxRequestSize : "
+                + kafkaSinkConfig.getMaxRequestSize());
         }
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootstrapServers());
         props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
-        props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSinkConfig.getBatchSize().toString());
-        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSinkConfig.getMaxRequestSize().toString());
-
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getBatchSize()));
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass());
 
         producer = new KafkaProducer<>(beforeCreateProducer(props));
 
-        log.info("Kafka sink started.");
+        log.info("Kafka sink started : {}.", props);
     }
 
     public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index b6c6840..6c6d1da 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.util.Objects;
 import lombok.Getter;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -53,24 +54,30 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws
Exception {
         kafkaSourceConfig = KafkaSourceConfig.load(config);
-        if (kafkaSourceConfig.getTopic() == null
-                || kafkaSourceConfig.getBootstrapServers() == null
-                || kafkaSourceConfig.getGroupId() == null
-                || kafkaSourceConfig.getFetchMinBytes() == 0
-                || kafkaSourceConfig.getAutoCommitIntervalMs() == 0
-                || kafkaSourceConfig.getSessionTimeoutMs() == 0) {
-            throw new IllegalArgumentException("Required property not set.");
+        Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set");
+        Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers
is not set");
+        Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer group id is
not set");
+        if (kafkaSourceConfig.getFetchMinBytes() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Consumer fetchMinBytes : "
+                + kafkaSourceConfig.getFetchMinBytes());
+        }
+        if (kafkaSourceConfig.isAutoCommitEnabled() && kafkaSourceConfig.getAutoCommitIntervalMs()
<= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Consumer autoCommitIntervalMs
: "
+                + kafkaSourceConfig.getAutoCommitIntervalMs());
+        }
+        if (kafkaSourceConfig.getSessionTimeoutMs() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Consumer sessionTimeoutMs :
"
+                + kafkaSourceConfig.getSessionTimeoutMs());
         }
 
         props = new Properties();
 
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrapServers());
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getGroupId());
-        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaSourceConfig.getFetchMinBytes().toString());
-        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaSourceConfig.getAutoCommitIntervalMs().toString());
-        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaSourceConfig.getSessionTimeoutMs().toString());
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, String.valueOf(kafkaSourceConfig.getFetchMinBytes()));
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs()));
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(kafkaSourceConfig.getSessionTimeoutMs()));
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 9ce2bdc..3e9640a 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -26,11 +26,19 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
  * Kafka sink should treats incoming messages as pure bytes. So we don't
  * apply schema into it.
  */
+@Connector(
+    name = "kafka",
+    type = IOType.SINK,
+    help = "The KafkaBytesSink is used for moving messages from Pulsar to Kafka.",
+    configClass = KafkaSinkConfig.class
+)
 @Slf4j
 public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 1e99208..59cc548 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -25,11 +25,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
  * Simple Kafka Source that just transfers the value part of the kafka records
  * as Strings
  */
+@Connector(
+    name = "kafka",
+    type = IOType.SOURCE,
+    help = "The KafkaBytesSource is used for moving messages from Kafka to Pulsar.",
+    configClass = KafkaSourceConfig.class
+)
 @Slf4j
 public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index 23a23ed..e6541ac 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,13 +40,47 @@ public class KafkaSinkConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "A comma-separated list of host and port pairs that are the addresses of "
+          + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
     private String bootstrapServers;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "The number of acknowledgments the producer requires the leader to have received
"
+          + "before considering a request complete. This controls the durability of records
that are sent.")
     private String acks;
-    private Long batchSize;
-    private Long maxRequestSize;
+    @FieldDoc(
+        defaultValue = "16384",
+        help =
+            "The batch size that Kafka producer will attempt to batch records together before
sending them to brokers.")
+    private long batchSize = 16384L;
+    @FieldDoc(
+        defaultValue = "16384",
+        help =
+            "The maximum size of a Kafka request in bytes.")
+    private long maxRequestSize = 1048576L;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "The Kafka topic that is used for Pulsar moving messages to.")
     private String topic;
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
+        help =
+            "The serializer class for Kafka producer to serialize keys.")
     private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
-    private String valueSerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
+        help =
+            "The serializer class for Kafka producer to serialize values. You typically shouldn't
care this. "
+          + "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
+    private String valueSerializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
 
     public static KafkaSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index 62f6bd5..c60c9de 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,15 +40,57 @@ public class KafkaSourceConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "A comma-separated list of host and port pairs that are the addresses of "
+          + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
     private String bootstrapServers;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "A string that uniquely identifies the group of consumer processes to which this
consumer belongs.")
     private String groupId;
-    private Long fetchMinBytes;
-    private Long autoCommitIntervalMs;
-    private Long sessionTimeoutMs;
+    @FieldDoc(
+        defaultValue = "1",
+        help =
+            "The minimum amount of data the server should return for a fetch request.")
+    private long fetchMinBytes = 1L;
+    @FieldDoc(
+        defaultValue = "5000",
+        help =
+            "The frequency in milliseconds that the consumer offsets are auto-committed to
Kafka "
+          + "if autoCommitEnabled is set to true.")
+    private long autoCommitIntervalMs = 5000L;
+    @FieldDoc(
+        defaultValue = "30000",
+        help =
+            "The timeout used to detect failures when using Kafka's group management facilities.")
+    private long sessionTimeoutMs = 30000L;
+    @FieldDoc(
+        defaultValue = "true",
+        help =
+            "If true the consumer's offset will be periodically committed in the background.")
     private boolean autoCommitEnabled = true;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "The Kafka topic that is used for Pulsar moving messages to.")
     private String topic;
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.StringDeserializer",
+        help =
+            "The deserializer class for Kafka consumer to deserialize keys.")
     private String keyDeserializationClass = "org.apache.kafka.common.serialization.StringDeserializer";
-    private String valueDeserializationClass = "org.apache.kafka.common.serialization.StringDeserializer";
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+        help =
+            "The deserializer class for Kafka consumer to deserialize values. You typically
shouldn't care this. "
+                + "Since the deserializer will be set by a specific implementation of `KafkaAbstractSource`.")
+    private String valueDeserializationClass = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
 
     public static KafkaSourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 5dfba78..28a7f12 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -33,6 +33,7 @@
 
   <modules>
     <module>core</module>
+    <module>docs</module>
     <module>twitter</module>
     <module>cassandra</module>
     <module>aerospike</module>
diff --git a/src/pulsar-io-gen b/src/pulsar-io-gen
new file mode 100755
index 0000000..05ace66
--- /dev/null
+++ b/src/pulsar-io-gen
@@ -0,0 +1,138 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+BINDIR=$(dirname "$0")
+PULSAR_HOME=`cd $BINDIR/..;pwd`
+
+DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
+
+if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
+then
+    . "$PULSAR_HOME/conf/pulsar_env.sh"
+fi
+
+# Check for the java to use
+if [[ -z $JAVA_HOME ]]; then
+    JAVA=$(which java)
+    if [ $? != 0 ]; then
+        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
+        exit 1
+    fi
+else
+    JAVA=$JAVA_HOME/bin/java
+fi
+
+# exclude tests jar
+RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
+if [ $? == 0 ]; then
+    PULSAR_JAR=$RELEASE_JAR
+fi
+
+# exclude tests jar
+BUILT_JAR=`ls $PULSAR_HOME/pulsar-io/docs/target/pulsar-*.jar 2> /dev/null | grep -v tests
| tail -1`
+if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
+    echo "\nCouldn't find pulsar jar.";
+    echo "Make sure you've run 'mvn package'\n";
+    exit 1;
+elif [ -e "$BUILT_JAR" ]; then
+    PULSAR_JAR=$BUILT_JAR
+fi
+
+add_maven_deps_to_classpath() {
+    MVN="mvn"
+    if [ "$MAVEN_HOME" != "" ]; then
+	MVN=${MAVEN_HOME}/bin/mvn
+    fi
+
+    # Need to generate classpath from maven pom. This is costly so generate it
+    # and cache it. Save the file into our target dir so a mvn clean will get
+    # clean it up and force us create a new one.
+    f="${PULSAR_HOME}/pulsar-io/docs/target/classpath.txt"
+    if [ ! -f "${f}" ]
+    then
+	${MVN} -f "${PULSAR_HOME}/pulsar-io/docs/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}"
&> /dev/null
+    fi
+    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
+}
+pulsar_help() {
+    cat <<EOF
+Usage: pulsar <command>
+where command is one of:
+    conf                    Generate sample yaml files
+
+    help                           This help message
+
+or command is the full name of a class with a defined main() method.
+
+Environment variables:
+   PULSAR_LOG_CONF               Log4j configuration file (default $DEFAULT_LOG_CONF)
+   PULSAR_EXTRA_OPTS             Extra options to be passed to the jvm
+   PULSAR_EXTRA_CLASSPATH        Add extra paths to the pulsar classpath
+
+These variable can also be set in conf/pulsar_env.sh
+EOF
+}
+
+if [ -d "$PULSAR_HOME/lib" ]; then
+	PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
+else
+    add_maven_deps_to_classpath
+fi
+
+if [ -z "$PULSAR_LOG_CONF" ]; then
+    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
+fi
+
+PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_HOME/pulsar-io/docs/target/pulsar-io-docs.jar:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
+PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
+OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF` -Djava.net.preferIPv4Stack=true"
+
+OPTS="-cp $PULSAR_CLASSPATH $OPTS"
+OPTS="$OPTS $PULSAR_EXTRA_OPTS"
+
+# log directory & file
+PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"Console"}
+PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
+PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-perftest.log"}
+
+#Configure log configuration system properties
+OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
+OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
+OPTS="$OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE"
+
+#Change to PULSAR_HOME to support relative paths
+cd "$PULSAR_HOME"
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+    pulsar_help;
+    exit 1;
+fi
+
+# get arguments
+COMMAND=$1
+shift
+
+if [ "$COMMAND" == "conf" ]; then
+    echo $JAVA $OPTS org.apache.pulsar.io.docs.ConnectorDocGenerator "$@"
+    exec $JAVA $OPTS org.apache.pulsar.io.docs.ConnectorDocGenerator "$@"
+else
+    pulsar_help;
+fi


Mime
View raw message