pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [Pulsar-Flink] Add Batch Csv Sink Support (#3039)
Date Thu, 22 Nov 2018 19:32:50 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 182ac65  [Pulsar-Flink] Add Batch Csv Sink Support (#3039)
182ac65 is described below

commit 182ac654f2bf16e5a7602ebf1431d71d8066bbb4
Author: Eren Avsarogullari <erenavsarogullari@gmail.com>
AuthorDate: Thu Nov 22 19:32:45 2018 +0000

    [Pulsar-Flink] Add Batch Csv Sink Support (#3039)
    
    ### Motivation
    This PR aims to add Flink - Pulsar Batch Csv Sink Support. If user works with Flink DataSet
API and would like to write the DataSets to Pulsar in Csv format, this sink can help.
    
    This is also similar approach what Flink currently offers:
    ```
    DataSet<Tuple3<String, Integer, Double>> values = // [...]
    values.writeAsCsv(filepath) // writing Datasets to FileSystem
    ```
    Ref: [Flink Batch Sink API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#data-sinks)
    
    ### Modifications
    Please find the change-set as follows:
    
    **1-** Defines `PulsarCsvOutputFormat` to write Flink Batch `DataSets` into Pulsar by
providing ready `CsvSerializationSchema`.
    **2-** Abstracts current implementation to support both `PulsarOutputFormat` (which supports
for user-defined serialization schema) and `PulsarCsvOutputFormat`
    **3-** UT Coverages
    **4-** `FlinkPulsarBatchCsvSinkExample` to show how to be used by users.
    **5-** `README.md `documentation
---
 pulsar-flink/pom.xml                               |  7 ++
 ...tputFormat.java => BasePulsarOutputFormat.java} | 28 ++++----
 .../connectors/pulsar/PulsarCsvOutputFormat.java   | 37 ++++++++++
 .../connectors/pulsar/PulsarOutputFormat.java      | 83 ++--------------------
 .../serialization/CsvSerializationSchema.java      | 58 +++++++++++++++
 ...matTest.java => PulsarCsvOutputFormatTest.java} | 33 +++++----
 .../connectors/pulsar/PulsarOutputFormatTest.java  | 76 ++++++++++++++++++--
 .../example/FlinkPulsarBatchCsvSinkExample.java    | 78 ++++++++++++++++++++
 .../batch/connectors/pulsar/example/README.md      | 79 +++++++++++++++++---
 .../serialization/CsvSerializationSchemaTest.java  | 53 ++++++++++++++
 10 files changed, 410 insertions(+), 122 deletions(-)

diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index a314b07..b91af16 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -58,6 +58,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>1.6</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-runtime_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
similarity index 78%
copy from pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
copy to pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index ac54248..d5f4af5 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -34,28 +34,26 @@ import java.io.IOException;
 import java.util.function.Function;
 
 /**
- * Flink Batch Sink to write DataSets into a Pulsar topic.
+ * Base Pulsar Output Format to write Flink DataSets into a Pulsar topic.
  */
-public class PulsarOutputFormat<T> extends RichOutputFormat<T> {
+public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> 
{
 
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarOutputFormat.class);
-
-    private static String serviceUrl;
-    private static String topicName;
-    private SerializationSchema<T> serializationSchema;
+    private static final Logger LOG = LoggerFactory.getLogger(BasePulsarOutputFormat.class);
+    private static final long serialVersionUID = 2304601727522060427L;
 
     private transient Function<Throwable, MessageId> failureCallback;
-
     private static volatile Producer<byte[]> producer;
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T>
serializationSchema) {
-        Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null.");
-        Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName must not
be blank.");
-        Preconditions.checkNotNull(serializationSchema,  "serializationSchema must not be
null.");
+    protected static String serviceUrl;
+    protected static String topicName;
+    protected SerializationSchema<T> serializationSchema;
+
+    protected BasePulsarOutputFormat(String serviceUrl, String topicName) {
+        Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot
be blank.");
+        Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName cannot
be blank.");
 
         this.serviceUrl = serviceUrl;
         this.topicName = topicName;
-        this.serializationSchema = serializationSchema;
 
         LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}",
this.topicName);
     }
@@ -92,7 +90,7 @@ public class PulsarOutputFormat<T> extends RichOutputFormat<T>
{
             synchronized (PulsarOutputFormat.class) {
                 if(producer == null){
                     producer = Preconditions.checkNotNull(createPulsarProducer(),
-                                                                "Pulsar producer must not
be null.");
+                            "Pulsar producer cannot be null.");
                 }
             }
         }
@@ -104,7 +102,7 @@ public class PulsarOutputFormat<T> extends RichOutputFormat<T>
{
             PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             return client.newProducer().topic(topicName).create();
         } catch (PulsarClientException e) {
-            LOG.error("Pulsar producer can not be created.", e);
+            LOG.error("Pulsar producer cannot be created.", e);
             throw e;
         }
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
new file mode 100644
index 0000000..d6aecda
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+
+/**
+ * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
+ */
+public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputFormat<T>
{
+
+    private static final long serialVersionUID = -4461671510903404196L;
+
+    public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
+        super(serviceUrl, topicName);
+
+        this.serializationSchema = new CsvSerializationSchema<>();
+    }
+
+}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index ac54248..8b46977 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -18,94 +18,21 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.function.Function;
 
 /**
- * Flink Batch Sink to write DataSets into a Pulsar topic.
+ * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
  */
-public class PulsarOutputFormat<T> extends RichOutputFormat<T> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarOutputFormat.class);
-
-    private static String serviceUrl;
-    private static String topicName;
-    private SerializationSchema<T> serializationSchema;
+public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
 
-    private transient Function<Throwable, MessageId> failureCallback;
-
-    private static volatile Producer<byte[]> producer;
+    private static final long serialVersionUID = 2997027580167793000L;
 
     public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T>
serializationSchema) {
-        Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null.");
-        Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName must not
be blank.");
-        Preconditions.checkNotNull(serializationSchema,  "serializationSchema must not be
null.");
+        super(serviceUrl, topicName);
+        Preconditions.checkNotNull(serializationSchema,  "serializationSchema cannot be null.");
 
-        this.serviceUrl = serviceUrl;
-        this.topicName = topicName;
         this.serializationSchema = serializationSchema;
-
-        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}",
this.topicName);
     }
 
-    @Override
-    public void configure(Configuration configuration) {
-
-    }
-
-    @Override
-    public void open(int taskNumber, int numTasks) throws IOException {
-        this.producer = getProducerInstance();
-
-        this.failureCallback = cause -> {
-            LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
-            return null;
-        };
-    }
-
-    @Override
-    public void writeRecord(T t) throws IOException {
-        byte[] data = this.serializationSchema.serialize(t);
-        this.producer.sendAsync(data)
-                .exceptionally(this.failureCallback);
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    private static Producer<byte[]> getProducerInstance() throws PulsarClientException
{
-        if(producer == null){
-            synchronized (PulsarOutputFormat.class) {
-                if(producer == null){
-                    producer = Preconditions.checkNotNull(createPulsarProducer(),
-                                                                "Pulsar producer must not
be null.");
-                }
-            }
-        }
-        return producer;
-    }
-
-    private static Producer<byte[]> createPulsarProducer() throws PulsarClientException
{
-        try {
-            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            return client.newProducer().topic(topicName).create();
-        } catch (PulsarClientException e) {
-            LOG.error("Pulsar producer can not be created.", e);
-            throw e;
-        }
-    }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
new file mode 100644
index 0000000..c01cba3
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Csv Serialization Schema to serialize Tuples to Csv.
+ */
+public class CsvSerializationSchema<T extends Tuple> implements SerializationSchema<T>
{
+
+    private static final Logger LOG = LoggerFactory.getLogger(CsvSerializationSchema.class);
+    private static final long serialVersionUID = -3379119592495232636L;
+
+    private static final int STRING_WRITER_INITIAL_BUFFER_SIZE = 256;
+
+    @Override
+    public byte[] serialize(T t) {
+        StringWriter stringWriter = null;
+        try {
+            Object[] fieldsValues = new Object[t.getArity()];
+            for(int index = 0; index < t.getArity(); index++) {
+                fieldsValues[index] = (t.getField(index));
+            }
+
+            stringWriter = new StringWriter(STRING_WRITER_INITIAL_BUFFER_SIZE);
+            CSVFormat.DEFAULT.withRecordSeparator("").printRecord(stringWriter, fieldsValues);
+        } catch (IOException e) {
+            LOG.error("Error while serializing the record to Csv", e);
+        }
+
+        return stringWriter.toString().getBytes();
+    }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
similarity index 50%
copy from pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
copy to pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index 5639709..175c224 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -23,35 +23,34 @@ import org.junit.Test;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests for PulsarOutputFormat
+ * Tests for Pulsar Csv Output Format
  */
-public class PulsarOutputFormatTest {
+public class PulsarCsvOutputFormatTest {
 
-    @Test(expected = NullPointerException.class)
-    public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
+        new PulsarCsvOutputFormat(null, "testTopic");
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
+    public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
+        new PulsarCsvOutputFormat("testServiceUrl", null);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
+    public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
+        new PulsarCsvOutputFormat("testServiceUrl", " ");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
-        new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
+        new PulsarCsvOutputFormat(" ", "testTopic");
     }
 
     @Test
-    public void testPulsarOutputFormatConstructor() {
-        PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic", text -> text.toString().getBytes());
-        assertNotNull(pulsarOutputFormat);
+    public void testPulsarCsvOutputFormatConstructor() {
+        PulsarCsvOutputFormat pulsarCsvOutputFormat =
+                new PulsarCsvOutputFormat("testServiceUrl", "testTopic");
+        assertNotNull(pulsarCsvOutputFormat);
     }
-
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 5639709..9da4de0 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -18,16 +18,22 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests for PulsarOutputFormat
+ * Tests for Pulsar Output Format
  */
 public class PulsarOutputFormatTest {
 
-    @Test(expected = NullPointerException.class)
+    @Test(expected = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
         new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
     }
@@ -42,16 +48,78 @@ public class PulsarOutputFormatTest {
         new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
+        new PulsarOutputFormat(" ", "testTopic", text -> text.toString().getBytes());
+    }
+
     @Test(expected = NullPointerException.class)
     public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
         new PulsarOutputFormat("testServiceUrl", "testTopic", null);
     }
 
     @Test
-    public void testPulsarOutputFormatConstructor() {
+    public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException
{
+        String input = "Wolfgang Amadeus Mozart";
+        PulsarOutputFormat pulsarOutputFormat =
+                new PulsarOutputFormat("testServiceUrl", "testTopic",
+                        text -> text.toString().getBytes());
+        assertNotNull(pulsarOutputFormat);
+        byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
+        String resultString = IOUtils.toString(bytes, StandardCharsets.UTF_8.toString());
+        assertEquals(input, resultString);
+    }
+
+    @Test
+    public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException
{
+        Employee employee = new Employee(1, "Test Employee", "Test Department");
         PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic", text -> text.toString().getBytes());
+                new PulsarOutputFormat("testServiceUrl", "testTopic",
+                        new EmployeeSerializationSchema());
         assertNotNull(pulsarOutputFormat);
+
+        byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(employee);
+        String resultString = IOUtils.toString(bytes, StandardCharsets.UTF_8.toString());
+        assertEquals(employee.toString(), resultString);
+    }
+
+    /**
+     * Employee Serialization Schema.
+     */
+    private class EmployeeSerializationSchema implements SerializationSchema<Employee>
{
+
+        @Override
+        public byte[] serialize(Employee employee) {
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(employee.id);
+            stringBuilder.append(" - ");
+            stringBuilder.append(employee.name);
+            stringBuilder.append(" - ");
+            stringBuilder.append(employee.department);
+
+            return stringBuilder.toString().getBytes();
+        }
+    }
+
+    /**
+     * Data type for Employee Model.
+     */
+    private class Employee {
+
+        public long id;
+        public String name;
+        public String department;
+
+        public Employee(long id, String name, String department) {
+            this.id = id;
+            this.name = name;
+            this.department = department;
+        }
+
+        @Override
+        public String toString() {
+            return id + " - " + name + " - " + department;
+        }
     }
 
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
new file mode 100644
index 0000000..0d7281a
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Implements a batch program on Pulsar topic by writing Flink DataSet as Csv.
+ */
+public class FlinkPulsarBatchCsvSinkExample {
+
+    private static final List<Tuple4<Integer, String, String, String>> employeeTuples
= Arrays.asList(
+            new Tuple4(1, "John", "Tyson", "Engineering"),
+            new Tuple4(2, "Pamela", "Moon", "HR"),
+            new Tuple4(3, "Jim", "Sun", "Finance"),
+            new Tuple4(4, "Michael", "Star", "Engineering"));
+
+    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+    private static final String TOPIC_NAME = "my-flink-topic";
+
+    public static void main(String[] args) throws Exception {
+
+        // set up the execution environment
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        // create PulsarCsvOutputFormat instance
+        final OutputFormat<Tuple4<Integer, String, String, String>> pulsarCsvOutputFormat
=
+                new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+        // create DataSet
+        DataSet<Tuple4<Integer, String, String, String>> employeeDS = env.fromCollection(employeeTuples);
+        // map employees' name, surname and department as upper-case
+        employeeDS.map(
+                new MapFunction<Tuple4<Integer, String, String, String>, Tuple4<Integer,
String, String, String>>() {
+            @Override
+            public Tuple4<Integer, String, String, String> map(
+                    Tuple4<Integer, String, String, String> employeeTuple) throws Exception
{
+                return new Tuple4(employeeTuple.f0,
+                        employeeTuple.f1.toUpperCase(),
+                        employeeTuple.f2.toUpperCase(),
+                        employeeTuple.f3.toUpperCase());
+            }
+        })
+        // filter employees which is member of Engineering
+        .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+        // write batch data to Pulsar
+        .output(pulsarCsvOutputFormat);
+
+        // execute program
+        env.execute("Flink - Pulsar Batch Csv");
+
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index f3baf00..9942ba4 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -1,10 +1,10 @@
 The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/)
to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
to Pulsar.
 
-## Prerequisites
+# Prerequisites
 
 To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
 
-### Maven
+# Maven
 
 If you're using Maven, add this to your `pom.xml`:
 
@@ -20,7 +20,7 @@ If you're using Maven, add this to your `pom.xml`:
 </dependency>
 ```
 
-### Gradle
+# Gradle
 
 If you're using Gradle, add this to your `build.gradle` file:
 
@@ -32,7 +32,8 @@ dependencies {
 }
 ```
 
-## Usage
+# PulsarOutputFormat
+### Usage
 
 Please find a sample usage as follows:
 
@@ -75,7 +76,7 @@ Please find a sample usage as follows:
         }
 ```
 
-## Sample Output
+### Sample Output
 
 Please find sample output for above application as follows:
 ```
@@ -89,12 +90,12 @@ encircles
 world
 ```
 
-## Complete Example
+### Complete Example
 
 You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
 In this example, Flink DataSet is processed as word-count and being written to Pulsar.
 
-## Complete Example Output
+### Complete Example Output
 Please find sample output for above linked application as follows:
 ```
 WordWithCount { word = important, count = 1 }
@@ -103,4 +104,66 @@ WordWithCount { word = imagination, count = 2 }
 WordWithCount { word = knowledge, count = 2 }
 WordWithCount { word = limited, count = 1 }
 WordWithCount { word = world, count = 1 }
-```
\ No newline at end of file
+```
+
+# PulsarCsvOutputFormat
+### Usage
+
+Please find a sample usage as follows:
+
+```java
+        private static final List<Tuple4<Integer, String, String, String>> employeeTuples
= Arrays.asList(
+            new Tuple4(1, "John", "Tyson", "Engineering"),
+            new Tuple4(2, "Pamela", "Moon", "HR"),
+            new Tuple4(3, "Jim", "Sun", "Finance"),
+            new Tuple4(4, "Michael", "Star", "Engineering"));
+
+        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+        private static final String TOPIC_NAME = "my-flink-topic";
+
+        public static void main(String[] args) throws Exception {
+
+            // set up the execution environment
+            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+            // create PulsarCsvOutputFormat instance
+            final OutputFormat<Tuple4<Integer, String, String, String>> pulsarCsvOutputFormat
=
+                    new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+            // create DataSet
+            DataSet<Tuple4<Integer, String, String, String>> employeeDS = env.fromCollection(employeeTuples);
+            // map employees' name, surname and department as upper-case
+            employeeDS.map(
+                    new MapFunction<Tuple4<Integer, String, String, String>, Tuple4<Integer,
String, String, String>>() {
+                @Override
+                public Tuple4<Integer, String, String, String> map(
+                        Tuple4<Integer, String, String, String> employeeTuple) throws
Exception {
+                    return new Tuple4(employeeTuple.f0,
+                            employeeTuple.f1.toUpperCase(),
+                            employeeTuple.f2.toUpperCase(),
+                            employeeTuple.f3.toUpperCase());
+                }
+            })
+            // filter employees who are member of Engineering
+            .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+            // write batch data to Pulsar
+            .output(pulsarCsvOutputFormat);
+
+            // execute program
+            env.execute("Flink - Pulsar Batch Csv");
+
+        }
+```
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+1,JOHN,TYSON,ENGINEERING
+4,MICHAEL,STAR,ENGINEERING
+```
+
+### Complete Example
+
+You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
+In this example, Flink DataSet is processed as word-count and being written to Pulsar.
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
new file mode 100644
index 0000000..1c8764b
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for Csv Serialization Schema
+ */
+public class CsvSerializationSchemaTest {
+
+    @Test
+    public void testCsvSerializationSchemaWithSuccessfulCase() throws IOException {
+        Tuple3<Integer, String, String> employee = new Tuple3(1, "Wolfgang Amadeus",
"Mozart");
+        CsvSerializationSchema schema = new CsvSerializationSchema();
+        byte[] rowBytes = schema.serialize(employee);
+        String csvContent = IOUtils.toString(rowBytes, StandardCharsets.UTF_8.toString());
+        assertEquals(csvContent, "1,Wolfgang Amadeus,Mozart");
+    }
+
+    @Test
+    public void testCsvSerializationSchemaWithEmptyRecord() throws IOException {
+        Tuple3<Integer, String, String> employee = new Tuple3();
+        CsvSerializationSchema schema = new CsvSerializationSchema();
+        byte[] employeeBytes = schema.serialize(employee);
+        String str = IOUtils.toString(employeeBytes, StandardCharsets.UTF_8.toString());
+        assertEquals(str, ",,");
+    }
+
+}


Mime
View raw message