pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2936: [io][docs] introduce annotations for generating connector yaml config files
Date Sun, 25 Nov 2018 19:41:28 GMT
sijie closed pull request #2936: [io][docs] introduce annotations for generating connector
yaml config files
URL: https://github.com/apache/pulsar/pull/2936
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index 0941913ea1..67e35096f5 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -39,10 +39,55 @@
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-docs</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
     <plugins>
+      <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-conf-dir</id>
+            <phase>compile</phase>
+            <configuration>
+              <tasks>
+                <mkdir dir="target/conf"/>
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>${exec-maven-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>io-conf-gen</id>
+            <phase>package</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>${project.basedir}/../../src/pulsar-io-gen</executable>
+              <arguments>
+                <argument>conf</argument>
+                <argument>-o</argument>
+                <argument>${project.basedir}/target/conf</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
       <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
         <executions>
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 1a190a47ca..a0704c32b3 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -27,6 +27,12 @@
     <format>tar.gz</format>
   </formats>
   <includeBaseDirectory>true</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/../../distribution/io/target/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+    </fileSet>
+  </fileSets>
   <files>
     <file>
       <source>${basedir}/../../LICENSE</source>
diff --git a/pom.xml b/pom.xml
index b6b9420221..6974ee7382 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,7 @@ flexible messaging model and an intuitive client API.</description>
     <log4j2.version>2.10.0</log4j2.version>
     <bouncycastle.version>1.55</bouncycastle.version>
     <jackson.version>2.9.7</jackson.version>
+    <reflections.version>0.9.11</reflections.version>
     <swagger.version>1.5.21</swagger.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <dockerfile-maven.version>1.3.7</dockerfile-maven.version>
@@ -303,6 +304,13 @@ flexible messaging model and an intuitive client API.</description>
         </exclusions>
       </dependency>
 
+      <!-- reflection libs -->
+      <dependency>
+        <groupId>org.reflections</groupId>
+        <artifactId>reflections</artifactId>
+        <version>${reflections.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.bookkeeper</groupId>
         <artifactId>stream-storage-java-client</artifactId>
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/Connector.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/Connector.java
new file mode 100644
index 0000000000..751657324e
--- /dev/null
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/Connector.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for documenting connectors.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface Connector {
+
+    /**
+     * Name of the connector.
+     *
+     * @return name of the connector.
+     */
+    String name();
+
+    /**
+     * Type of the connector.
+     *
+     * @return type of the connector.
+     */
+    IOType type();
+
+    /**
+     * Description of this connector.
+     *
+     * @return description of this connector.
+     */
+    String help();
+
+    /**
+     * Config class used by this connector.
+     *
+     * @return config class used by this connector.
+     */
+    Class configClass();
+
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java
new file mode 100644
index 0000000000..465da13d82
--- /dev/null
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for documenting fields in a config.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface FieldDoc {
+
+    /**
+     * Return if the field is required or not.
+     *
+     * @return true if the field is required, otherwise false
+     */
+    boolean required() default false;
+
+    /**
+     * Return the value of this field.
+     *
+     * @return the default value of this field
+     */
+    String defaultValue();
+
+    /**
+     * Return the description of this field.
+     *
+     * @return the help message of this field
+     */
+    String help();
+
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/IOType.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/IOType.java
new file mode 100644
index 0000000000..3e900cf8b2
--- /dev/null
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/IOType.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core.annotations;
+
+/**
+ * Type of the io connector.
+ */
+public enum IOType {
+
+    /**
+     * Sink connector.
+     */
+    SINK,
+
+    /**
+     * Source connector.
+     */
+    SOURCE
+
+}
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
new file mode 100644
index 0000000000..04fffeee58
--- /dev/null
+++ b/pulsar-io/docs/pom.xml
@@ -0,0 +1,57 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-docs</artifactId>
+  <name>Pulsar IO :: Docs</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.reflections</groupId>
+      <artifactId>reflections</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+
+    <!-- include connectors -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-kafka</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
b/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
new file mode 100644
index 0000000000..167e1db088
--- /dev/null
+++ b/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.docs;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Strings;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
+
+@Slf4j
+public class ConnectorDocGenerator {
+
+    private static final String INDENT = "  ";
+
+    private static Reflections newReflections() throws Exception {
+        List<URL> urls = new ArrayList<>();
+        ClassLoader[] classLoaders = new ClassLoader[] {
+            ConnectorDocGenerator.class.getClassLoader(),
+            Thread.currentThread().getContextClassLoader()
+        };
+        for (int i = 0; i < classLoaders.length; i++) {
+            if (classLoaders[i] instanceof URLClassLoader) {
+                urls.addAll(Arrays.asList(((URLClassLoader) classLoaders[i]).getURLs()));
+            } else {
+                throw new RuntimeException("ClassLoader '" + classLoaders[i] + " is not an
instance of URLClassLoader");
+            }
+        }
+        ConfigurationBuilder confBuilder = new ConfigurationBuilder();
+        confBuilder.setUrls(urls);
+        return new Reflections(confBuilder);
+    }
+
+    private final Reflections reflections;
+
+    public ConnectorDocGenerator() throws Exception {
+        this.reflections = newReflections();
+    }
+
+    private void generateConnectorYaml(Class configClass, PrintWriter writer) {
+        log.info("Processing connector config class : {}", configClass);
+
+        writer.println("configs:");
+
+        Field[] fields = configClass.getDeclaredFields();
+        for (Field field : fields) {
+            if (Modifier.isStatic(field.getModifiers())) {
+                continue;
+            }
+            FieldDoc fieldDoc = field.getDeclaredAnnotation(FieldDoc.class);
+            if (null == fieldDoc) {
+                throw new RuntimeException("Missing `FieldDoc` for field '" + field.getName()
+ "'");
+            }
+            writer.println(INDENT + "# " + fieldDoc.help());
+            String fieldPrefix = "";
+            if (!fieldDoc.required()) {
+                fieldPrefix = "# ";
+            }
+            if (Strings.isNullOrEmpty(fieldDoc.defaultValue())) {
+                writer.println(INDENT + fieldPrefix + field.getName() + ":");
+            } else {
+                writer.println(INDENT + fieldPrefix + field.getName() + ": " + fieldDoc.defaultValue());
+            }
+            writer.println();
+        }
+        writer.flush();
+    }
+
+    private void generateConnectorYaml(Class connectorClass, Connector connectorDef, PrintWriter
writer) {
+        log.info("Processing connector definition : {}", connectorDef);
+        writer.println("# " + connectorDef.type() + " connector : " + connectorClass.getName());
+        writer.println();
+        writer.println("# " + connectorDef.help());
+        writer.println();
+        generateConnectorYaml(connectorDef.configClass(), writer);
+    }
+
+    private void generatorConnectorYamls(String outputDir) throws IOException  {
+        Set<Class<?>> connectorClasses = reflections.getTypesAnnotatedWith(Connector.class);
+        log.info("Retrieve all `Connector` annotated classes : {}", connectorClasses);
+
+        for (Class<?> connectorClass : connectorClasses) {
+            Connector connectorDef = connectorClass.getDeclaredAnnotation(Connector.class);
+            try (FileWriter fileWriter = new FileWriter(
+                Paths.get(
+                    outputDir,
+                    "pulsar-io-" + connectorDef.name()
+                        + "-" + connectorDef.type().name().toLowerCase()).toString() + ".yml"))
{
+                PrintWriter pw = new PrintWriter(fileWriter);
+                generateConnectorYaml(connectorClass, connectorDef, pw);
+                pw.flush();
+            }
+        }
+    }
+
+    /**
+     * Args for stats generator.
+     */
+    private static class MainArgs {
+
+        @Parameter(
+            names = {
+                "-o", "--output-dir"
+            },
+            description = "The output dir to dump connector docs",
+            required = true
+        )
+        String outputDir = null;
+
+        @Parameter(
+            names = {
+                "-h", "--help"
+            },
+            description = "Show this help message")
+        boolean help = false;
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        MainArgs mainArgs = new MainArgs();
+
+        JCommander commander = new JCommander();
+        try {
+            commander.setProgramName("connector-doc-gen");
+            commander.addObject(mainArgs);
+            commander.parse(args);
+            if (mainArgs.help) {
+                commander.usage();
+                Runtime.getRuntime().exit(0);
+                return;
+            }
+        } catch (Exception e) {
+            commander.usage();
+            Runtime.getRuntime().exit(-1);
+            return;
+        }
+
+        ConnectorDocGenerator docGen = new ConnectorDocGenerator();
+        docGen.generatorConnectorYamls(mainArgs.outputDir);
+    }
+
+}
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 50ce4b97b6..ac0cce5206 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
@@ -77,25 +78,28 @@ protected Properties beforeCreateProducer(Properties props) {
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception
{
         kafkaSinkConfig = KafkaSinkConfig.load(config);
-        if (kafkaSinkConfig.getTopic() == null
-                || kafkaSinkConfig.getBootstrapServers() == null
-                || kafkaSinkConfig.getAcks() == null
-                || kafkaSinkConfig.getBatchSize() == 0
-                || kafkaSinkConfig.getMaxRequestSize() == 0) {
-            throw new IllegalArgumentException("Required property not set.");
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set");
+        Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), "Kafka bootstrapServers
is not set");
+        Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode is not set");
+        if (kafkaSinkConfig.getBatchSize() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Producer batchSize : "
+                + kafkaSinkConfig.getBatchSize());
+        }
+        if (kafkaSinkConfig.getMaxRequestSize() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Producer maxRequestSize : "
+                + kafkaSinkConfig.getMaxRequestSize());
         }
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootstrapServers());
         props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
-        props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSinkConfig.getBatchSize().toString());
-        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSinkConfig.getMaxRequestSize().toString());
-
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getBatchSize()));
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass());
 
         producer = new KafkaProducer<>(beforeCreateProducer(props));
 
-        log.info("Kafka sink started.");
+        log.info("Kafka sink started : {}.", props);
     }
 
     public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index b6c6840cc3..6c6d1dabe8 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.util.Objects;
 import lombok.Getter;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -53,24 +54,30 @@
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws
Exception {
         kafkaSourceConfig = KafkaSourceConfig.load(config);
-        if (kafkaSourceConfig.getTopic() == null
-                || kafkaSourceConfig.getBootstrapServers() == null
-                || kafkaSourceConfig.getGroupId() == null
-                || kafkaSourceConfig.getFetchMinBytes() == 0
-                || kafkaSourceConfig.getAutoCommitIntervalMs() == 0
-                || kafkaSourceConfig.getSessionTimeoutMs() == 0) {
-            throw new IllegalArgumentException("Required property not set.");
+        Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set");
+        Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers
is not set");
+        Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer group id is
not set");
+        if (kafkaSourceConfig.getFetchMinBytes() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Consumer fetchMinBytes : "
+                + kafkaSourceConfig.getFetchMinBytes());
+        }
+        if (kafkaSourceConfig.isAutoCommitEnabled() && kafkaSourceConfig.getAutoCommitIntervalMs()
<= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Consumer autoCommitIntervalMs
: "
+                + kafkaSourceConfig.getAutoCommitIntervalMs());
+        }
+        if (kafkaSourceConfig.getSessionTimeoutMs() <= 0) {
+            throw new IllegalArgumentException("Invalid Kafka Consumer sessionTimeoutMs :
"
+                + kafkaSourceConfig.getSessionTimeoutMs());
         }
 
         props = new Properties();
 
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrapServers());
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getGroupId());
-        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaSourceConfig.getFetchMinBytes().toString());
-        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaSourceConfig.getAutoCommitIntervalMs().toString());
-        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaSourceConfig.getSessionTimeoutMs().toString());
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, String.valueOf(kafkaSourceConfig.getFetchMinBytes()));
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs()));
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(kafkaSourceConfig.getSessionTimeoutMs()));
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 9ce2bdcb77..3e9640a697 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -26,11 +26,19 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
  * Kafka sink should treats incoming messages as pure bytes. So we don't
  * apply schema into it.
  */
+@Connector(
+    name = "kafka",
+    type = IOType.SINK,
+    help = "The KafkaBytesSink is used for moving messages from Pulsar to Kafka.",
+    configClass = KafkaSinkConfig.class
+)
 @Slf4j
 public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 1e99208611..59cc548fa2 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -25,11 +25,19 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
  * Simple Kafka Source that just transfers the value part of the kafka records
  * as Strings
  */
+@Connector(
+    name = "kafka",
+    type = IOType.SOURCE,
+    help = "The KafkaBytesSource is used for moving messages from Kafka to Pulsar.",
+    configClass = KafkaSourceConfig.class
+)
 @Slf4j
 public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index 23a23edc01..e6541acc98 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,13 +40,47 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "A comma-separated list of host and port pairs that are the addresses of "
+          + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
     private String bootstrapServers;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "The number of acknowledgments the producer requires the leader to have received
"
+          + "before considering a request complete. This controls the durability of records
that are sent.")
     private String acks;
-    private Long batchSize;
-    private Long maxRequestSize;
+    @FieldDoc(
+        defaultValue = "16384",
+        help =
+            "The batch size that Kafka producer will attempt to batch records together before
sending them to brokers.")
+    private long batchSize = 16384L;
+    @FieldDoc(
+        defaultValue = "16384",
+        help =
+            "The maximum size of a Kafka request in bytes.")
+    private long maxRequestSize = 1048576L;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "The Kafka topic that is used for Pulsar moving messages to.")
     private String topic;
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
+        help =
+            "The serializer class for Kafka producer to serialize keys.")
     private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
-    private String valueSerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
+        help =
+            "The serializer class for Kafka producer to serialize values. You typically shouldn't
care this. "
+          + "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
+    private String valueSerializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
 
     public static KafkaSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index 62f6bd5847..c60c9debe7 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
 @Setter
@@ -39,15 +40,57 @@
 
     private static final long serialVersionUID = 1L;
 
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "A comma-separated list of host and port pairs that are the addresses of "
+          + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
     private String bootstrapServers;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "A string that uniquely identifies the group of consumer processes to which this
consumer belongs.")
     private String groupId;
-    private Long fetchMinBytes;
-    private Long autoCommitIntervalMs;
-    private Long sessionTimeoutMs;
+    @FieldDoc(
+        defaultValue = "1",
+        help =
+            "The minimum amount of data the server should return for a fetch request.")
+    private long fetchMinBytes = 1L;
+    @FieldDoc(
+        defaultValue = "5000",
+        help =
+            "The frequency in milliseconds that the consumer offsets are auto-committed to
Kafka "
+          + "if autoCommitEnabled is set to true.")
+    private long autoCommitIntervalMs = 5000L;
+    @FieldDoc(
+        defaultValue = "30000",
+        help =
+            "The timeout used to detect failures when using Kafka's group management facilities.")
+    private long sessionTimeoutMs = 30000L;
+    @FieldDoc(
+        defaultValue = "true",
+        help =
+            "If true the consumer's offset will be periodically committed in the background.")
     private boolean autoCommitEnabled = true;
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help =
+            "The Kafka topic that is used for Pulsar moving messages to.")
     private String topic;
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.StringDeserializer",
+        help =
+            "The deserializer class for Kafka consumer to deserialize keys.")
     private String keyDeserializationClass = "org.apache.kafka.common.serialization.StringDeserializer";
-    private String valueDeserializationClass = "org.apache.kafka.common.serialization.StringDeserializer";
+    @FieldDoc(
+        defaultValue = "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+        help =
+            "The deserializer class for Kafka consumer to deserialize values. You typically
shouldn't care this. "
+                + "Since the deserializer will be set by a specific implementation of `KafkaAbstractSource`.")
+    private String valueDeserializationClass = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
 
     public static KafkaSourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 99b501c2be..12d8edf0f2 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -33,6 +33,7 @@
 
   <modules>
     <module>core</module>
+    <module>docs</module>
     <module>twitter</module>
     <module>cassandra</module>
     <module>aerospike</module>
diff --git a/src/pulsar-io-gen b/src/pulsar-io-gen
new file mode 100755
index 0000000000..05ace66b74
--- /dev/null
+++ b/src/pulsar-io-gen
@@ -0,0 +1,138 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+BINDIR=$(dirname "$0")
+PULSAR_HOME=`cd $BINDIR/..;pwd`
+
+DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
+
+if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
+then
+    . "$PULSAR_HOME/conf/pulsar_env.sh"
+fi
+
+# Check for the java to use
+if [[ -z $JAVA_HOME ]]; then
+    JAVA=$(which java)
+    if [ $? != 0 ]; then
+        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
+        exit 1
+    fi
+else
+    JAVA=$JAVA_HOME/bin/java
+fi
+
+# exclude tests jar
+RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
+if [ $? == 0 ]; then
+    PULSAR_JAR=$RELEASE_JAR
+fi
+
+# exclude tests jar
+BUILT_JAR=`ls $PULSAR_HOME/pulsar-io/docs/target/pulsar-*.jar 2> /dev/null | grep -v tests
| tail -1`
+if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
+    echo "\nCouldn't find pulsar jar.";
+    echo "Make sure you've run 'mvn package'\n";
+    exit 1;
+elif [ -e "$BUILT_JAR" ]; then
+    PULSAR_JAR=$BUILT_JAR
+fi
+
+add_maven_deps_to_classpath() {
+    MVN="mvn"
+    if [ "$MAVEN_HOME" != "" ]; then
+	MVN=${MAVEN_HOME}/bin/mvn
+    fi
+
+    # Need to generate classpath from maven pom. This is costly so generate it
+    # and cache it. Save the file into our target dir so a mvn clean will get
+    # clean it up and force us create a new one.
+    f="${PULSAR_HOME}/pulsar-io/docs/target/classpath.txt"
+    if [ ! -f "${f}" ]
+    then
+	${MVN} -f "${PULSAR_HOME}/pulsar-io/docs/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}"
&> /dev/null
+    fi
+    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
+}
+pulsar_help() {
+    cat <<EOF
+Usage: pulsar <command>
+where command is one of:
+    conf                    Generate sample yaml files
+
+    help                           This help message
+
+or command is the full name of a class with a defined main() method.
+
+Environment variables:
+   PULSAR_LOG_CONF               Log4j configuration file (default $DEFAULT_LOG_CONF)
+   PULSAR_EXTRA_OPTS             Extra options to be passed to the jvm
+   PULSAR_EXTRA_CLASSPATH        Add extra paths to the pulsar classpath
+
+These variable can also be set in conf/pulsar_env.sh
+EOF
+}
+
+if [ -d "$PULSAR_HOME/lib" ]; then
+	PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
+else
+    add_maven_deps_to_classpath
+fi
+
+if [ -z "$PULSAR_LOG_CONF" ]; then
+    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
+fi
+
+PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_HOME/pulsar-io/docs/target/pulsar-io-docs.jar:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
+PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
+OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF` -Djava.net.preferIPv4Stack=true"
+
+OPTS="-cp $PULSAR_CLASSPATH $OPTS"
+OPTS="$OPTS $PULSAR_EXTRA_OPTS"
+
+# log directory & file
+PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"Console"}
+PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
+PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-perftest.log"}
+
+#Configure log configuration system properties
+OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
+OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
+OPTS="$OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE"
+
+#Change to PULSAR_HOME to support relative paths
+cd "$PULSAR_HOME"
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+    pulsar_help;
+    exit 1;
+fi
+
+# get arguments
+COMMAND=$1
+shift
+
+if [ "$COMMAND" == "conf" ]; then
+    echo $JAVA $OPTS org.apache.pulsar.io.docs.ConnectorDocGenerator "$@"
+    exec $JAVA $OPTS org.apache.pulsar.io.docs.ConnectorDocGenerator "$@"
+else
+    pulsar_help;
+fi


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message