pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [pulsar-flink] Implements a batch program on Pulsar topic by writing Flink DataSet as Avro (#3205)
Date Tue, 18 Dec 2018 09:48:17 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5493e2f  [pulsar-flink] Implements a batch program on Pulsar topic by writing Flink
DataSet as Avro (#3205)
5493e2f is described below

commit 5493e2fa447a6efac472960d9fa41eff4718c379
Author: wpl <1269223860@qq.com>
AuthorDate: Tue Dec 18 17:48:11 2018 +0800

    [pulsar-flink] Implements a batch program on Pulsar topic by writing Flink DataSet as
Avro (#3205)
    
    Implements a batch program on Pulsar topic by writing Flink DataSet as Avro data Type
---
 .gitignore                                         |  3 +
 examples/flink-consumer-source/pom.xml             | 25 ++++++++
 .../example/FlinkPulsarBatchAvroSinkExample.java   | 74 ++++++++++++++++++++++
 .../batch/connectors/pulsar/example/README.md      | 68 +++++++++++++++++++-
 .../src/main/resources/avro/NasaMission.avsc       | 10 +++
 .../FlinkPulsarBatchAvroSinkScalaExample.scala     | 72 +++++++++++++++++++++
 .../batch/connectors/pulsar/example/README.md      | 68 +++++++++++++++++++-
 pulsar-flink/pom.xml                               | 35 ++++++++++
 .../connectors/pulsar/PulsarAvroOutputFormat.java  | 36 +++++++++++
 .../serialization/AvroSerializationSchema.java     | 61 ++++++++++++++++++
 .../pulsar/PulsarAvroOutputFormatTest.java         | 56 ++++++++++++++++
 .../serialization/AvroSerializationSchemaTest.java | 65 +++++++++++++++++++
 .../src/test/resources/avro/NasaMission.avsc       | 10 +++
 13 files changed, 577 insertions(+), 6 deletions(-)

diff --git a/.gitignore b/.gitignore
index f605f1e..d16fa61 100644
--- a/.gitignore
+++ b/.gitignore
@@ -78,3 +78,6 @@ docker.debug-info
 
 **/website/i18n/*
 **/website/translated_docs*
+
+examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/avro/generated
+pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/avro/generated
diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml
index 2f674fc..0cea049 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -51,6 +51,13 @@
   </dependencies>
 
   <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -86,6 +93,24 @@
           </execution>
         </executions>
       </plugin>
+      <!-- Generate Test class from avro schema -->
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <testSourceDirectory>${project.basedir}/src/main/resources/avro</testSourceDirectory>
+              <testOutputDirectory>${project.basedir}/src/main/java/</testOutputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
new file mode 100644
index 0000000..553bf6c
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
+import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Implements a batch program on Pulsar topic by writing Flink DataSet as Avro.
+ */
+public class FlinkPulsarBatchAvroSinkExample {
+
+    private static final List<NasaMission> nasaMissions = Arrays.asList(
+            NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
+            NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
+            NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
+            NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
+            NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
+
+    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+    private static final String TOPIC_NAME = "my-flink-topic";
+
+    public static void main(String[] args) throws Exception {
+
+        // set up the execution environment
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        // create PulsarAvroOutputFormat instance
+        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL,
TOPIC_NAME);
+
+        // create DataSet
+        DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
+        // map nasa mission names to upper-case
+        nasaMissionDS.map(nasaMission -> new NasaMission(
+                nasaMission.getId(),
+                nasaMission.getName(),
+                nasaMission.getStartYear(),
+                nasaMission.getEndYear()))
+                // filter missions which started after 1970
+                .filter(nasaMission -> nasaMission.getStartYear() > 1970)
+                // write batch data to Pulsar
+                .output(pulsarAvroOutputFormat);
+
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2);
+
+        // execute program
+        env.execute("Flink - Pulsar Batch Avro");
+    }
+
+}
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 2f99e76..3116b3b 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -113,7 +113,7 @@ world
 
 ### Complete Example
 
-You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
 In this example, Flink DataSet is processed as word-count and being written to Pulsar.
 
 ### Complete Example Output
@@ -192,7 +192,7 @@ Please find sample output for above application as follows:
 
 ### Complete Example
 
-You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
 In this example, Flink DataSet is processed and written to Pulsar in Csv format.
 
 
@@ -290,5 +290,67 @@ Please find sample output for above application as follows:
 
 ### Complete Example
 
-You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
 In this example, Flink DataSet is processed and written to Pulsar in Json format.
+
+
+# PulsarAvroOutputFormat
+### Usage
+
+Please find a sample usage as follows:
+
+```java
+        private static final List<NasaMission> nasaMissions = Arrays.asList(
+                    NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
+                    NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
+                    NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
+                    NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
+                    NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
+        
+            private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+            private static final String TOPIC_NAME = "my-flink-topic";
+        
+            public static void main(String[] args) throws Exception {
+        
+                // set up the execution environment
+                final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        
+                // create PulsarAvroOutputFormat instance
+                final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL,
TOPIC_NAME);
+        
+                // create DataSet
+                DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
+                // map nasa mission names to upper-case
+                nasaMissionDS.map(nasaMission -> new NasaMission(
+                        nasaMission.getId(),
+                        nasaMission.getName(),
+                        nasaMission.getStartYear(),
+                        nasaMission.getEndYear()))
+                        // filter missions which started after 1970
+                        .filter(nasaMission -> nasaMission.getStartYear() > 1970)
+                        // write batch data to Pulsar
+                        .output(pulsarAvroOutputFormat);
+        
+                // set parallelism to write Pulsar in parallel (optional)
+                env.setParallelism(2);
+        
+                // execute program
+                env.execute("Flink - Pulsar Batch Avro");
+            }
+
+```
+
+**Note:** NasaMission class are automatically generated by Avro
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+ "4,SKYLAB,1973,1974"
+ "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
+```
+
+### Complete Example
+
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java).
+In this example, Flink DataSet is processed and written to Pulsar in Avro format.
diff --git a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
new file mode 100644
index 0000000..4a669e0
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
@@ -0,0 +1,10 @@
+{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+ "type": "record",
+ "name": "NasaMission",
+ "fields": [
+     {"name": "id", "type": "int"},
+     {"name": "name", "type": "string"},
+     {"name": "start_year",  "type": ["int", "null"]},
+     {"name": "end_year", "type": ["int", "null"]}
+ ]
+}
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
new file mode 100644
index 0000000..5f95611
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
+import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission
+
+/**
+  * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
+  */
+object FlinkPulsarBatchAvroSinkScalaExample {
+
+  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+  private val TOPIC_NAME = "my-flink-topic"
+
+  val nasaMissions = List(
+    NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
+    NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
+    NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
+    NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
+    NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up the execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // create PulsarCsvOutputFormat instance
+    val pulsarAvroOutputFormat =
+      new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+    // create DataSet
+    val textDS = env.fromCollection(nasaMissions)
+
+    // map nasa mission names to upper-case
+    textDS.map(nasaMission => new NasaMission(
+      nasaMission.getId,
+      nasaMission.getName,
+      nasaMission.getStartYear,
+      nasaMission.getEndYear))
+
+      // filter missions which started after 1970
+      .filter(_.getStartYear > 1970)
+
+      // write batch data to Pulsar as Avro
+      .output(pulsarAvroOutputFormat)
+
+    // set parallelism to write Pulsar in parallel (optional)
+    env.setParallelism(2)
+
+    // execute program
+    env.execute("Flink - Pulsar Batch Avro")
+  }
+
+}
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
index b3f8cb5..e206392 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -131,7 +131,7 @@ WordWithCount { word = world, count = 1 }
 
 ### Complete Example
 
-You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
 In this example, Flink DataSet is processed as word-count and being written to Pulsar.
 
 
@@ -203,7 +203,7 @@ Please find sample output for above application as follows:
 
 ### Complete Example
 
-You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
 In this example, Flink DataSet is processed and written to Pulsar in Csv format.
 
 
@@ -279,5 +279,67 @@ Please find sample output for above application as follows:
 
 ### Complete Example
 
-You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
 In this example, Flink DataSet is processed and written to Pulsar in Json format.
+
+
+# PulsarAvroOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarAvroOutputFormat` as follows:
+
+```scala
+      val nasaMissions = List(
+          NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
+          NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
+          NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
+          NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
+          NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)
+      
+        def main(args: Array[String]): Unit = {
+      
+          // set up the execution environment
+          val env = ExecutionEnvironment.getExecutionEnvironment
+      
+          // create PulsarCsvOutputFormat instance
+          val pulsarAvroOutputFormat =
+            new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+      
+          // create DataSet
+          val textDS = env.fromCollection(nasaMissions)
+      
+          // map nasa mission names to upper-case
+          textDS.map(nasaMission => new NasaMission(
+            nasaMission.getId,
+            nasaMission.getName,
+            nasaMission.getStartYear,
+            nasaMission.getEndYear))
+      
+            // filter missions which started after 1970
+            .filter(_.getStartYear > 1970)
+      
+            // write batch data to Pulsar as Avro
+            .output(pulsarAvroOutputFormat)
+      
+          // set parallelism to write Pulsar in parallel (optional)
+          env.setParallelism(2)
+      
+          // execute program
+          env.execute("Flink - Pulsar Batch Avro")
+        }
+```
+
+**Note:** NasaMission class are automatically generated by Avro
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+ "4,SKYLAB,1973,1974"
+ "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
+```
+
+### Complete Example
+
+You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala).
+In this example, Flink DataSet is processed and written to Pulsar in Avro format.
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index b91af16..f32ca81 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -52,6 +52,18 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-avro</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
@@ -102,6 +114,8 @@
       <scope>test</scope>
     </dependency>
 
+
+
   </dependencies>
 
   <build>
@@ -111,5 +125,26 @@
         <filtering>true</filtering>
       </resource>
     </resources>
+
+    <plugins>
+      <!-- Generate Test class from avro schema -->
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+              <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
   </build>
 </project>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
new file mode 100644
index 0000000..d15dfe7
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
+
+/**
+ * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
+ */
+public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsarOutputFormat<T>
{
+
+    private static final long serialVersionUID = -6794070714728773530L;
+
+    public PulsarAvroOutputFormat(String serviceUrl, String topicName) {
+        super(serviceUrl, topicName);
+        this.serializationSchema = new AvroSerializationSchema();
+    }
+
+}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
new file mode 100644
index 0000000..693ef8d
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Avro Serialization Schema to serialize Dataset records to Avro.
+ */
+public class AvroSerializationSchema<T extends SpecificRecord> implements SerializationSchema<T>
{
+
+    private static final long serialVersionUID = -6691140169413760919L;
+
+    @Override
+    public byte[] serialize(T t) {
+        if (null == t) {
+            return null;
+        }
+
+        // Writer to serialize Avro record into a byte array.
+        DatumWriter<T> writer = new SpecificDatumWriter<>(t.getSchema());
+        // Output stream to serialize records into byte array.
+        ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+        // Low-level class for serialization of Avro values.
+        Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+        arrayOutputStream.reset();
+        try {
+            writer.write(t,encoder);
+            encoder.flush();
+        } catch (IOException e) {
+            throw new RuntimeException("Error while serializing the record to Avro", e);
+        }
+
+        return arrayOutputStream.toByteArray();
+    }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
new file mode 100644
index 0000000..7013006
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Pulsar Avro Output Format
+ */
+public class PulsarAvroOutputFormatTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
+        new PulsarAvroOutputFormat(null, "testTopic");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
+        new PulsarAvroOutputFormat("testServiceUrl", null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
+        new PulsarAvroOutputFormat("testServiceUrl", " ");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
+        new PulsarAvroOutputFormat(" ", "testTopic");
+    }
+
+    @Test
+    public void testPulsarAvroOutputFormatConstructor() {
+        PulsarAvroOutputFormat pulsarAvroOutputFormat =
+                new PulsarAvroOutputFormat("testServiceUrl", "testTopic");
+        assertNotNull(pulsarAvroOutputFormat);
+    }
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
new file mode 100644
index 0000000..176acc4
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for Avro Serialization Schema
+ */
+public class AvroSerializationSchemaTest {
+
+    @Test
+    public void testAvroSerializationSchemaWithSuccessfulCase() throws IOException {
+        NasaMission nasaMission = NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build();
+        AvroSerializationSchema schema = new AvroSerializationSchema();
+        byte[] rowBytes = schema.serialize(nasaMission);
+
+        AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(nasaMission.getSchema());
+        GenericRecord genericRecord = deserializationSchema.deserialize(rowBytes);
+
+        assertEquals(nasaMission.getId(), genericRecord.get("id"));
+        assertEquals(nasaMission.getName(), genericRecord.get("name").toString());
+        assertEquals(nasaMission.getStartYear(), genericRecord.get("start_year"));
+        assertEquals(nasaMission.getEndYear(), genericRecord.get("end_year"));
+    }
+
+    @Test
+    public void testAvroSerializationSchemaWithEmptyRecord() throws IOException {
+        NasaMission nasaMission = NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(null).setEndYear(null).build();
+        AvroSerializationSchema schema = new AvroSerializationSchema();
+        byte[] rowBytes = schema.serialize(nasaMission);
+
+        AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(nasaMission.getSchema());
+        GenericRecord genericRecord = deserializationSchema.deserialize(rowBytes);
+
+        assertEquals(nasaMission.getId(), genericRecord.get("id"));
+        assertEquals(nasaMission.getName(), genericRecord.get("name").toString());
+        assertEquals(null, genericRecord.get("start_year"));
+        assertEquals(null, genericRecord.get("end_year"));
+    }
+
+}
diff --git a/pulsar-flink/src/test/resources/avro/NasaMission.avsc b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
new file mode 100644
index 0000000..4a669e0
--- /dev/null
+++ b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
@@ -0,0 +1,10 @@
+{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+ "type": "record",
+ "name": "NasaMission",
+ "fields": [
+     {"name": "id", "type": "int"},
+     {"name": "name", "type": "string"},
+     {"name": "start_year",  "type": ["int", "null"]},
+     {"name": "end_year", "type": ["int", "null"]}
+ ]
+}


Mime
View raw message