pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: issue#3939 : Allow client authentication from pulsar-flink package (#3949)
Date Tue, 02 Apr 2019 16:02:06 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8adff85  issue#3939 : Allow client authentication from pulsar-flink package (#3949)
8adff85 is described below

commit 8adff85d8fece38428f28c1288c0ebb7dde3a744
Author: Shivji Kumar Jha <shiv4289@gmail.com>
AuthorDate: Tue Apr 2 21:32:01 2019 +0530

    issue#3939 : Allow client authentication from pulsar-flink package (#3949)
    
    Problem:
    ========
    pulsar-flink module (aka flink connector) internally uses pulsar-client. Though the pulsar client allows setting tokens in the client builder, the flink connector does not provide a way to pass authentication token to the pulsar client it uses internally.
    
    Solution:
    ========
    Accept authetication information as an input in pulsar-flink module. Pass this authentication information to pulsar-client.
---
 .../example/FlinkPulsarBatchAvroSinkExample.java   |  3 +-
 .../example/FlinkPulsarBatchCsvSinkExample.java    |  3 +-
 .../example/FlinkPulsarBatchJsonSinkExample.java   |  3 +-
 .../example/FlinkPulsarBatchSinkExample.java       |  3 +-
 .../example/PulsarConsumerSourceWordCount.java     |  2 +
 ...lsarConsumerSourceWordCountToAvroTableSink.java |  3 +-
 ...lsarConsumerSourceWordCountToJsonTableSink.java |  3 +-
 .../FlinkPulsarBatchAvroSinkScalaExample.scala     |  3 +-
 .../FlinkPulsarBatchCsvSinkScalaExample.scala      |  3 +-
 .../FlinkPulsarBatchJsonSinkScalaExample.scala     |  3 +-
 .../example/FlinkPulsarBatchSinkScalaExample.scala |  3 +-
 .../connectors/pulsar/BasePulsarOutputFormat.java  | 17 +++---
 .../connectors/pulsar/PulsarAvroOutputFormat.java  |  5 +-
 .../connectors/pulsar/PulsarCsvOutputFormat.java   |  5 +-
 .../connectors/pulsar/PulsarJsonOutputFormat.java  |  5 +-
 .../connectors/pulsar/PulsarOutputFormat.java      |  5 +-
 .../connectors/pulsar/FlinkPulsarProducer.java     | 14 ++++-
 .../connectors/pulsar/PulsarAvroTableSink.java     |  8 ++-
 .../connectors/pulsar/PulsarConsumerSource.java    |  4 ++
 .../connectors/pulsar/PulsarJsonTableSink.java     |  7 ++-
 .../connectors/pulsar/PulsarSourceBuilder.java     | 61 ++++++++++++++++++++++
 .../connectors/pulsar/PulsarTableSink.java         |  5 ++
 .../pulsar/PulsarAvroOutputFormatTest.java         | 11 ++--
 .../pulsar/PulsarCsvOutputFormatTest.java          | 11 ++--
 .../pulsar/PulsarJsonOutputFormatTest.java         | 11 ++--
 .../connectors/pulsar/PulsarOutputFormatTest.java  | 15 +++---
 .../connectors/pulsar/PulsarAvroTableSinkTest.java |  6 ++-
 .../connectors/pulsar/PulsarJsonTableSinkTest.java |  6 ++-
 28 files changed, 174 insertions(+), 54 deletions(-)

diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 1349dba..6d077f9 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.util.Arrays;
 import java.util.List;
@@ -63,7 +64,7 @@ public class FlinkPulsarBatchAvroSinkExample {
         System.out.println("\tTopic:\t" + topic);
 
         // create PulsarAvroOutputFormat instance
-        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic);
+        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
 
         // create DataSet
         DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 3e658dc..4abb0a4 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.util.Arrays;
 import java.util.List;
@@ -65,7 +66,7 @@ public class FlinkPulsarBatchCsvSinkExample {
 
         // create PulsarCsvOutputFormat instance
         final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat<>(serviceUrl, topic);
+                new PulsarCsvOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
 
         // create DataSet
         DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
index 3937ae9..dc56364 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.util.Arrays;
 import java.util.List;
@@ -62,7 +63,7 @@ public class FlinkPulsarBatchJsonSinkExample {
         System.out.println("\tTopic:\t" + topic);
 
         // create PulsarJsonOutputFormat instance
-        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic);
+        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
 
         // create DataSet
         DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index c90d016..2c89579 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
 import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 /**
  * Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
@@ -60,7 +61,7 @@ public class FlinkPulsarBatchSinkExample {
 
         // create PulsarOutputFormat instance
         final OutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat(serviceUrl, topic, wordWithCount -> wordWithCount.toString().getBytes());
+                new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());
 
         // create DataSet
         DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
index 942ddc1..a323f8f 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
 import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 /**
  * Implements a streaming wordcount program on pulsar topics.
@@ -97,6 +98,7 @@ public class PulsarConsumerSourceWordCount {
             wc.addSink(new FlinkPulsarProducer<>(
                 serviceUrl,
                 outputTopic,
+                new AuthenticationDisabled(),
                 wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
                 wordWithCount -> wordWithCount.word
             )).setParallelism(parallelism);
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
index 9fdc9a2..84e85e8 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 /**
  * Implements a streaming wordcount program on pulsar topics.
@@ -107,7 +108,7 @@ public class PulsarConsumerSourceWordCountToAvroTableSink {
         table.printSchema();
         TableSink sink = null;
         if (null != outputTopic) {
-            sink = new PulsarAvroTableSink(serviceUrl, outputTopic, ROUTING_KEY, WordWithCount.class);
+            sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY, WordWithCount.class);
         } else {
             // print the results with a csv file
             sink = new CsvTableSink("./examples/file",  "|");
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
index 1be9dde..a4f9c3c 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 /**
  * Implements a streaming wordcount program on pulsar topics.
@@ -108,7 +109,7 @@ public class PulsarConsumerSourceWordCountToJsonTableSink {
         table.printSchema();
         TableSink sink = null;
         if (null != outputTopic) {
-            sink = new PulsarJsonTableSink(serviceUrl, outputTopic, ROUTING_KEY);
+            sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);
         } else {
             // print the results with a csv file
             sink = new CsvTableSink("./examples/file",  "|");
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index f10d6c1..a64e656 100644
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.avro.generated.NasaMission
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 
 /**
   * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
@@ -59,7 +60,7 @@ object FlinkPulsarBatchAvroSinkScalaExample {
 
     // create PulsarCsvOutputFormat instance
     val pulsarAvroOutputFormat =
-      new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
+      new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
 
     // create DataSet
     val textDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
index 3233616..302d0ab 100644
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 
 /**
   * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv.
@@ -65,7 +66,7 @@ object FlinkPulsarBatchCsvSinkScalaExample {
 
     // create PulsarCsvOutputFormat instance
     val pulsarCsvOutputFormat =
-      new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
+      new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
 
     // create DataSet
     val textDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
index 60d02e5..9518751 100644
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
 import scala.beans.BeanProperty
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 
 /**
   * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json.
@@ -66,7 +67,7 @@ object FlinkPulsarBatchJsonSinkScalaExample {
     println("\tTopic:\t" + topic)
 
     // create PulsarJsonOutputFormat instance
-    val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
+    val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
 
     // create DataSet
     val nasaMissionDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
index 4de0dcb..369e56d 100644
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
 import org.apache.flink.util.Collector
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 
 /**
   * Data type for words with count.
@@ -63,7 +64,7 @@ object FlinkPulsarBatchSinkScalaExample {
 
     // create PulsarOutputFormat instance
     val pulsarOutputFormat =
-      new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new SerializationSchema[WordWithCount] {
+      new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new AuthenticationDisabled(), new SerializationSchema[WordWithCount] {
         override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
       })
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index ca34327..644c8e9 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Authentication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,14 +47,16 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
 
     protected final String serviceUrl;
     protected final String topicName;
+    private final Authentication authentication;
     protected SerializationSchema<T> serializationSchema;
 
-    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) {
+    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
         Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
         Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName cannot be blank.");
 
         this.serviceUrl = serviceUrl;
         this.topicName = topicName;
+        this.authentication = authentication;
 
         LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
     }
@@ -65,7 +68,7 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        this.producer = getProducerInstance(serviceUrl, topicName);
+        this.producer = getProducerInstance(serviceUrl, topicName, authentication);
 
         this.failureCallback = cause -> {
             LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
@@ -85,11 +88,12 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
 
     }
 
-    private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException {
+    private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName, Authentication authentication)
+            throws PulsarClientException {
         if(producer == null){
             synchronized (PulsarOutputFormat.class) {
                 if(producer == null){
-                    producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
+                    producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication),
                             "Pulsar producer cannot be null.");
                 }
             }
@@ -97,9 +101,10 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
         return producer;
     }
 
-    private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException {
+    private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName, Authentication authentication)
+            throws PulsarClientException {
         try {
-            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
             return client.newProducer().topic(topicName).create();
         } catch (PulsarClientException e) {
             LOG.error("Pulsar producer cannot be created.", e);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
index d15dfe7..52484ef 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
@@ -28,8 +29,8 @@ public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsar
 
     private static final long serialVersionUID = -6794070714728773530L;
 
-    public PulsarAvroOutputFormat(String serviceUrl, String topicName) {
-        super(serviceUrl, topicName);
+    public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+        super(serviceUrl, topicName, authentication);
         this.serializationSchema = new AvroSerializationSchema();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index adae9f7..d36a260 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
@@ -28,8 +29,8 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputForm
 
     private static final long serialVersionUID = -4461671510903404196L;
 
-    public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
-        super(serviceUrl, topicName);
+    public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+        super(serviceUrl, topicName, authentication);
         this.serializationSchema = new CsvSerializationSchema<>();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index 3fe5baa..96d7a01 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar;
 
 import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
@@ -27,8 +28,8 @@ public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
 
     private static final long serialVersionUID = 8499620770848461958L;
 
-    public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
-        super(serviceUrl, topicName);
+    public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+        super(serviceUrl, topicName, authentication);
         this.serializationSchema = new JsonSerializationSchema();
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 889970f..393faaf 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
@@ -28,8 +29,8 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
 
     private static final long serialVersionUID = 2997027580167793000L;
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema<T> serializationSchema) {
-        super(serviceUrl, topicName);
+    public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
+        super(serviceUrl, topicName, authentication);
         Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
         this.serializationSchema = serializationSchema;
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index c0d3905..55eb619 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +66,12 @@ public class FlinkPulsarProducer<IN>
     protected final String defaultTopicName;
 
     /**
+     * Pulsar client will use this authentication information, if required.
+     */
+    private final Authentication authentication;
+
+
+    /**
      * (Serializable) SerializationSchema for turning objects used with Flink into.
      * byte[] for Pulsar.
      */
@@ -121,13 +128,15 @@ public class FlinkPulsarProducer<IN>
 
     public FlinkPulsarProducer(String serviceUrl,
                                String defaultTopicName,
+                               Authentication authentication,
                                SerializationSchema<IN> serializationSchema,
                                PulsarKeyExtractor<IN> keyExtractor) {
-        this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
+        this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null);
     }
 
     public FlinkPulsarProducer(String serviceUrl,
                                String defaultTopicName,
+                               Authentication authentication,
                                SerializationSchema<IN> serializationSchema,
                                PulsarKeyExtractor<IN> keyExtractor,
                                Map<String, Object> producerConfig) {
@@ -135,6 +144,7 @@ public class FlinkPulsarProducer<IN>
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
         this.serviceUrl = serviceUrl;
         this.defaultTopicName = defaultTopicName;
+        this.authentication = authentication;
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
@@ -190,7 +200,7 @@ public class FlinkPulsarProducer<IN>
     }
 
     private Producer<byte[]> createProducer() throws Exception {
-        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
         ProducerBuilder<byte[]> producerBuilder = client.newProducer();
         if (producerConfig != null) {
             producerBuilder = producerBuilder.loadConf(producerConfig);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index b370345..20999fd 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
@@ -44,6 +45,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
 
     protected final String serviceUrl;
     protected final String topic;
+    protected final Authentication authentication;
     protected final String routingKeyFieldName;
     protected SerializationSchema<Row> serializationSchema;
     protected String[] fieldNames;
@@ -56,16 +58,17 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
      *
      * @param serviceUrl          pulsar service url
      * @param topic               topic in pulsar to which table is written
-     * @param producerConf        producer configuration
      * @param routingKeyFieldName routing key field name
      */
     public PulsarAvroTableSink(
             String serviceUrl,
             String topic,
+            Authentication authentication,
             String routingKeyFieldName,
             Class<? extends SpecificRecord> recordClazz) {
         this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
         this.topic = checkNotNull(topic, "Topic is null");
+        this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
         this.routingKeyFieldName = routingKeyFieldName;
         this.recordClazz = recordClazz;
     }
@@ -78,6 +81,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
         return new FlinkPulsarProducer<Row>(
                 serviceUrl,
                 topic,
+                authentication,
                 serializationSchema,
                 keyExtractor);
     }
@@ -110,7 +114,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
 
     @Override
     public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, routingKeyFieldName, recordClazz);
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, authentication, routingKeyFieldName, recordClazz);
 
         sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
         sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 6479bf0..5af82bc 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.Authentication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
     private final int messageReceiveTimeoutMs = 100;
     private final String serviceUrl;
     private final Set<String> topicNames;
+    private final Authentication authentication;
     private final Pattern topicsPattern;
     private final String subscriptionName;
     private final DeserializationSchema<T> deserializer;
@@ -75,6 +77,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
     PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
         super(MessageId.class);
         this.serviceUrl = builder.serviceUrl;
+        this.authentication = builder.authentication;
         this.topicNames = builder.topicNames;
         this.topicsPattern = builder.topicsPattern;
         this.deserializer = builder.deserializationSchema;
@@ -191,6 +194,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
     PulsarClient createClient() throws PulsarClientException {
         return PulsarClient.builder()
             .serviceUrl(serviceUrl)
+            .authentication(authentication)
             .build();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
index 45c2642..c37250d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Base class for {@link PulsarTableSink} that serializes data in JSON format.
@@ -33,14 +34,15 @@ public class PulsarJsonTableSink extends PulsarTableSink {
      *
      * @param serviceUrl          pulsar service url
      * @param topic               topic in pulsar to which table is written
-     * @param producerConf        producer configuration
+     * @param authentication      authetication info required by pulsar client
      * @param routingKeyFieldName routing key field name
      */
     public PulsarJsonTableSink(
             String serviceUrl,
             String topic,
+            Authentication authentication,
             String routingKeyFieldName) {
-        super(serviceUrl, topic, routingKeyFieldName);
+        super(serviceUrl, topic, authentication, routingKeyFieldName);
     }
 
     @Override
@@ -53,6 +55,7 @@ public class PulsarJsonTableSink extends PulsarTableSink {
         return new PulsarJsonTableSink(
                 serviceUrl,
                 topic,
+                authentication,
                 routingKeyFieldName);
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3f30390..3b78495 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -23,12 +23,16 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.regex.Pattern;
+import java.util.Map;
 
 /**
  * A class for building a pulsar source.
@@ -43,6 +47,7 @@ public class PulsarSourceBuilder<T> {
     final DeserializationSchema<T> deserializationSchema;
     String serviceUrl = SERVICE_URL;
     final Set<String> topicNames = new TreeSet<>();
+    Authentication authentication;
     Pattern topicsPattern;
     String subscriptionName = "flink-sub";
     long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
@@ -163,6 +168,62 @@ public class PulsarSourceBuilder<T> {
         throw new IllegalArgumentException("acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
     }
 
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     *
+     * @param authentication an instance of the {@link Authentication} provider already constructed
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> authentication(Authentication authentication) {
+        Preconditions.checkArgument(authentication != null,
+                "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication");
+        this.authentication = authentication;
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client instance
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin to use
+     * @param authParamsString
+     *            string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
+     * @return this builder
+     * @throws PulsarClientException.UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public PulsarSourceBuilder<T> authentication(String authPluginClassName, String authParamsString)
+            throws PulsarClientException.UnsupportedAuthenticationException {
+        Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+                "Authentication-Plugin class name can not be blank");
+        Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
+                "Authentication-Plugin parameters can not be blank");
+        this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString);
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client instance
+     * using a config map.
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @return this builder
+     * @throws PulsarClientException.UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public PulsarSourceBuilder<T> authentication(String authPluginClassName, Map<String, String> authParams)
+            throws PulsarClientException.UnsupportedAuthenticationException {
+        Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+                "Authentication-Plugin class name can not be blank");
+        Preconditions.checkArgument((authParams != null && authParams.isEmpty() == false),
+                "parameters to authentication plugin can not be null/empty");
+        this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
+        return this;
+    }
+
     public SourceFunction<T> build() {
         Preconditions.checkNotNull(serviceUrl, "a service url is required");
         Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index 0fc45f7..5d20a1d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * An append-only table sink to emit a streaming table as a Pulsar stream.
@@ -41,6 +42,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
 
     protected final String serviceUrl;
     protected final String topic;
+    protected Authentication authentication;
     protected SerializationSchema<Row> serializationSchema;
     protected PulsarKeyExtractor<Row> keyExtractor;
     protected String[] fieldNames;
@@ -50,9 +52,11 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
     public PulsarTableSink(
             String serviceUrl,
             String topic,
+            Authentication authentication,
             String routingKeyFieldName) {
         this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
         this.topic = checkNotNull(topic, "Topic is null");
+        this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
         this.routingKeyFieldName = routingKeyFieldName;
     }
 
@@ -78,6 +82,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
         return new FlinkPulsarProducer<Row>(
                 serviceUrl,
                 topic,
+                authentication,
                 serializationSchema,
                 keyExtractor);
     }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
index 62c3b5d..bedcbda 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNotNull;
@@ -30,28 +31,28 @@ public class PulsarAvroOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarAvroOutputFormat(null, "testTopic");
+        new PulsarAvroOutputFormat(null, "testTopic", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarAvroOutputFormat("testServiceUrl", null);
+        new PulsarAvroOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarAvroOutputFormat("testServiceUrl", " ");
+        new PulsarAvroOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarAvroOutputFormat(" ", "testTopic");
+        new PulsarAvroOutputFormat(" ", "testTopic", new AuthenticationDisabled());
     }
 
     @Test
     public void testPulsarAvroOutputFormatConstructor() {
         PulsarAvroOutputFormat pulsarAvroOutputFormat =
-                new PulsarAvroOutputFormat("testServiceUrl", "testTopic");
+                new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
         assertNotNull(pulsarAvroOutputFormat);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index a564a89..caccb6b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@ public class PulsarCsvOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarCsvOutputFormat(null, "testTopic");
+        new PulsarCsvOutputFormat(null, "testTopic", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarCsvOutputFormat("testServiceUrl", null);
+        new PulsarCsvOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarCsvOutputFormat("testServiceUrl", " ");
+        new PulsarCsvOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarCsvOutputFormat(" ", "testTopic");
+        new PulsarCsvOutputFormat(" ", "testTopic", new AuthenticationDisabled());
     }
 
     @Test
     public void testPulsarCsvOutputFormatConstructor() {
         PulsarCsvOutputFormat pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat("testServiceUrl", "testTopic");
+                new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
         assertNotNull(pulsarCsvOutputFormat);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
index b9953cf..4ab7232 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@ public class PulsarJsonOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarJsonOutputFormat(null, "testTopic");
+        new PulsarJsonOutputFormat(null, "testTopic", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarJsonOutputFormat("testServiceUrl", null);
+        new PulsarJsonOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarJsonOutputFormat("testServiceUrl", " ");
+        new PulsarJsonOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarJsonOutputFormat(" ", "testTopic");
+        new PulsarJsonOutputFormat(" ", "testTopic", new AuthenticationDisabled());
     }
 
     @Test
     public void testPulsarJsonOutputFormatConstructor() {
         PulsarJsonOutputFormat pulsarJsonOutputFormat =
-                new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+                new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
         assertNotNull(pulsarJsonOutputFormat);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 41cf8b2..238c49b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -35,34 +36,34 @@ public class PulsarOutputFormatTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
+        new PulsarOutputFormat(null, "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
+        new PulsarOutputFormat("testServiceUrl", null, new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
+        new PulsarOutputFormat("testServiceUrl", " ", new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarOutputFormat(" ", "testTopic", text -> text.toString().getBytes());
+        new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = NullPointerException.class)
     public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
-        new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+        new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null);
     }
 
     @Test
     public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException {
         String input = "Wolfgang Amadeus Mozart";
         PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic",
+                new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
                         text -> text.toString().getBytes());
         assertNotNull(pulsarOutputFormat);
         byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
@@ -74,7 +75,7 @@ public class PulsarOutputFormatTest {
     public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException {
         Employee employee = new Employee(1, "Test Employee", "Test Department");
         PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic",
+                new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
                         new EmployeeSerializationSchema());
         assertNotNull(pulsarOutputFormat);
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 85bb2ed..125ee4a 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@ import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 public class PulsarAvroTableSinkTest {
     private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String TOPIC_NAME = "test_topic";
+    private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
     private static final String ROUTING_KEY = "name";
 
     private final String[] fieldNames = {"id", "name","start_year","end_year"};
@@ -86,13 +89,14 @@ public class PulsarAvroTableSinkTest {
 
     private PulsarAvroTableSink spySink() throws Exception {
 
-        PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY, NasaMission.class);
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY, NasaMission.class);
         FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
         PowerMockito.whenNew(
                 FlinkPulsarProducer.class
         ).withArguments(
                 Mockito.anyString(),
                 Mockito.anyString(),
+                Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 9ceefff..c42ae6c 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@ public class PulsarJsonTableSinkTest {
 
     private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String TOPIC_NAME = "test_topic";
+    private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
     private static final String ROUTING_KEY = "key";
     private final String[] fieldNames = {"key", "value"};
     private final TypeInformation[] typeInformations = {
@@ -80,13 +83,14 @@ public class PulsarJsonTableSinkTest {
     }
 
     private PulsarJsonTableSink spySink() throws Exception {
-        PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY);
+        PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY);
         FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
         PowerMockito.whenNew(
                 FlinkPulsarProducer.class
         ).withArguments(
                 Mockito.anyString(),
                 Mockito.anyString(),
+                Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);


Mime
View raw message