From commits-return-25988-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Tue Apr 2 16:02:09 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6E886180668 for ; Tue, 2 Apr 2019 18:02:08 +0200 (CEST) Received: (qmail 29380 invoked by uid 500); 2 Apr 2019 16:02:07 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 29366 invoked by uid 99); 2 Apr 2019 16:02:07 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Apr 2019 16:02:07 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D1A4285C80; Tue, 2 Apr 2019 16:02:06 +0000 (UTC) Date: Tue, 02 Apr 2019 16:02:06 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] branch master updated: issue#3939 : Allow client authentication from pulsar-flink package (#3949) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155422092651.18340.15068429401699181653@gitbox.apache.org> From: mmerli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 7b683760ac3fac7e2a3075a7f2b28724b881d21f X-Git-Newrev: 8adff85d8fece38428f28c1288c0ebb7dde3a744 X-Git-Rev: 8adff85d8fece38428f28c1288c0ebb7dde3a744 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8adff85 issue#3939 : Allow client authentication from pulsar-flink package (#3949) 8adff85 is described below commit 8adff85d8fece38428f28c1288c0ebb7dde3a744 Author: Shivji Kumar Jha AuthorDate: Tue Apr 2 21:32:01 2019 +0530 issue#3939 : Allow client authentication from pulsar-flink package (#3949) Problem: ======== pulsar-flink module (aka flink connector) internally uses pulsar-client. Though the pulsar client allows setting tokens in the client builder, the flink connector does not provide a way to pass authentication token to the pulsar client it uses internally. Solution: ======== Accept authetication information as an input in pulsar-flink module. Pass this authentication information to pulsar-client. --- .../example/FlinkPulsarBatchAvroSinkExample.java | 3 +- .../example/FlinkPulsarBatchCsvSinkExample.java | 3 +- .../example/FlinkPulsarBatchJsonSinkExample.java | 3 +- .../example/FlinkPulsarBatchSinkExample.java | 3 +- .../example/PulsarConsumerSourceWordCount.java | 2 + ...lsarConsumerSourceWordCountToAvroTableSink.java | 3 +- ...lsarConsumerSourceWordCountToJsonTableSink.java | 3 +- .../FlinkPulsarBatchAvroSinkScalaExample.scala | 3 +- .../FlinkPulsarBatchCsvSinkScalaExample.scala | 3 +- .../FlinkPulsarBatchJsonSinkScalaExample.scala | 3 +- .../example/FlinkPulsarBatchSinkScalaExample.scala | 3 +- .../connectors/pulsar/BasePulsarOutputFormat.java | 17 +++--- .../connectors/pulsar/PulsarAvroOutputFormat.java | 5 +- .../connectors/pulsar/PulsarCsvOutputFormat.java | 5 +- .../connectors/pulsar/PulsarJsonOutputFormat.java | 5 +- .../connectors/pulsar/PulsarOutputFormat.java | 5 +- .../connectors/pulsar/FlinkPulsarProducer.java | 14 ++++- .../connectors/pulsar/PulsarAvroTableSink.java | 8 ++- .../connectors/pulsar/PulsarConsumerSource.java | 4 ++ .../connectors/pulsar/PulsarJsonTableSink.java | 7 ++- .../connectors/pulsar/PulsarSourceBuilder.java | 61 ++++++++++++++++++++++ .../connectors/pulsar/PulsarTableSink.java | 5 ++ .../pulsar/PulsarAvroOutputFormatTest.java | 11 ++-- .../pulsar/PulsarCsvOutputFormatTest.java | 11 ++-- .../pulsar/PulsarJsonOutputFormatTest.java | 11 ++-- .../connectors/pulsar/PulsarOutputFormatTest.java | 15 +++--- .../connectors/pulsar/PulsarAvroTableSinkTest.java | 6 ++- .../connectors/pulsar/PulsarJsonTableSinkTest.java | 6 ++- 28 files changed, 174 insertions(+), 54 deletions(-) diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java index 1349dba..6d077f9 100644 --- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java +++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.avro.generated.NasaMission; import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import java.util.Arrays; import java.util.List; @@ -63,7 +64,7 @@ public class FlinkPulsarBatchAvroSinkExample { System.out.println("\tTopic:\t" + topic); // create PulsarAvroOutputFormat instance - final OutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic); + final OutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled()); // create DataSet DataSet nasaMissionDS = env.fromCollection(nasaMissions); diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java index 3e658dc..4abb0a4 100644 --- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java +++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import java.util.Arrays; import java.util.List; @@ -65,7 +66,7 @@ public class FlinkPulsarBatchCsvSinkExample { // create PulsarCsvOutputFormat instance final OutputFormat> pulsarCsvOutputFormat = - new PulsarCsvOutputFormat<>(serviceUrl, topic); + new PulsarCsvOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled()); // create DataSet DataSet> nasaMissionDS = env.fromCollection(nasaMissions); diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java index 3937ae9..dc56364 100644 --- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java +++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import java.util.Arrays; import java.util.List; @@ -62,7 +63,7 @@ public class FlinkPulsarBatchJsonSinkExample { System.out.println("\tTopic:\t" + topic); // create PulsarJsonOutputFormat instance - final OutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic); + final OutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled()); // create DataSet DataSet nasaMissionDS = env.fromCollection(nasaMissions); diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java index c90d016..2c89579 100644 --- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java +++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat; import org.apache.flink.util.Collector; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; /** * Implements a batch word-count program on Pulsar topic by writing Flink DataSet. @@ -60,7 +61,7 @@ public class FlinkPulsarBatchSinkExample { // create PulsarOutputFormat instance final OutputFormat pulsarOutputFormat = - new PulsarOutputFormat(serviceUrl, topic, wordWithCount -> wordWithCount.toString().getBytes()); + new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes()); // create DataSet DataSet textDS = env.fromElements(EINSTEIN_QUOTE); diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java index 942ddc1..a323f8f 100644 --- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java +++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer; import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; /** * Implements a streaming wordcount program on pulsar topics. @@ -97,6 +98,7 @@ public class PulsarConsumerSourceWordCount { wc.addSink(new FlinkPulsarProducer<>( serviceUrl, outputTopic, + new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes(UTF_8), wordWithCount -> wordWithCount.word )).setParallelism(parallelism); diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java index 9fdc9a2..84e85e8 100644 --- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java +++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java @@ -35,6 +35,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sinks.TableSink; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; /** * Implements a streaming wordcount program on pulsar topics. @@ -107,7 +108,7 @@ public class PulsarConsumerSourceWordCountToAvroTableSink { table.printSchema(); TableSink sink = null; if (null != outputTopic) { - sink = new PulsarAvroTableSink(serviceUrl, outputTopic, ROUTING_KEY, WordWithCount.class); + sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY, WordWithCount.class); } else { // print the results with a csv file sink = new CsvTableSink("./examples/file", "|"); diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java index 1be9dde..a4f9c3c 100644 --- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java +++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java @@ -37,6 +37,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sinks.TableSink; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; /** * Implements a streaming wordcount program on pulsar topics. @@ -108,7 +109,7 @@ public class PulsarConsumerSourceWordCountToJsonTableSink { table.printSchema(); TableSink sink = null; if (null != outputTopic) { - sink = new PulsarJsonTableSink(serviceUrl, outputTopic, ROUTING_KEY); + sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY); } else { // print the results with a csv file sink = new CsvTableSink("./examples/file", "|"); diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala index f10d6c1..a64e656 100644 --- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala +++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.avro.generated.NasaMission import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled /** * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro. @@ -59,7 +60,7 @@ object FlinkPulsarBatchAvroSinkScalaExample { // create PulsarCsvOutputFormat instance val pulsarAvroOutputFormat = - new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic) + new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled()) // create DataSet val textDS = env.fromCollection(nasaMissions) diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala index 3233616..302d0ab 100644 --- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala +++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4 import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled /** * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv. @@ -65,7 +66,7 @@ object FlinkPulsarBatchCsvSinkScalaExample { // create PulsarCsvOutputFormat instance val pulsarCsvOutputFormat = - new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic) + new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled()) // create DataSet val textDS = env.fromCollection(nasaMissions) diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala index 60d02e5..9518751 100644 --- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala +++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat import scala.beans.BeanProperty +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled /** * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json. @@ -66,7 +67,7 @@ object FlinkPulsarBatchJsonSinkScalaExample { println("\tTopic:\t" + topic) // create PulsarJsonOutputFormat instance - val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic) + val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled()) // create DataSet val nasaMissionDS = env.fromCollection(nasaMissions) diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala index 4de0dcb..369e56d 100644 --- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala +++ b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat import org.apache.flink.util.Collector +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled /** * Data type for words with count. @@ -63,7 +64,7 @@ object FlinkPulsarBatchSinkScalaExample { // create PulsarOutputFormat instance val pulsarOutputFormat = - new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new SerializationSchema[WordWithCount] { + new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new AuthenticationDisabled(), new SerializationSchema[WordWithCount] { override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes }) diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java index ca34327..644c8e9 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Authentication; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,14 +47,16 @@ public abstract class BasePulsarOutputFormat extends RichOutputFormat { protected final String serviceUrl; protected final String topicName; + private final Authentication authentication; protected SerializationSchema serializationSchema; - protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) { + protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) { Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank."); Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank."); this.serviceUrl = serviceUrl; this.topicName = topicName; + this.authentication = authentication; LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName); } @@ -65,7 +68,7 @@ public abstract class BasePulsarOutputFormat extends RichOutputFormat { @Override public void open(int taskNumber, int numTasks) throws IOException { - this.producer = getProducerInstance(serviceUrl, topicName); + this.producer = getProducerInstance(serviceUrl, topicName, authentication); this.failureCallback = cause -> { LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause); @@ -85,11 +88,12 @@ public abstract class BasePulsarOutputFormat extends RichOutputFormat { } - private static Producer getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException { + private static Producer getProducerInstance(String serviceUrl, String topicName, Authentication authentication) + throws PulsarClientException { if(producer == null){ synchronized (PulsarOutputFormat.class) { if(producer == null){ - producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName), + producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication), "Pulsar producer cannot be null."); } } @@ -97,9 +101,10 @@ public abstract class BasePulsarOutputFormat extends RichOutputFormat { return producer; } - private static Producer createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException { + private static Producer createPulsarProducer(String serviceUrl, String topicName, Authentication authentication) + throws PulsarClientException { try { - PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); + PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build(); return client.newProducer().topic(topicName).create(); } catch (PulsarClientException e) { LOG.error("Pulsar producer cannot be created.", e); diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java index d15dfe7..52484ef 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java @@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar; import org.apache.avro.specific.SpecificRecord; import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema; +import org.apache.pulsar.client.api.Authentication; /** * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format. @@ -28,8 +29,8 @@ public class PulsarAvroOutputFormat extends BasePulsar private static final long serialVersionUID = -6794070714728773530L; - public PulsarAvroOutputFormat(String serviceUrl, String topicName) { - super(serviceUrl, topicName); + public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) { + super(serviceUrl, topicName, authentication); this.serializationSchema = new AvroSerializationSchema(); } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java index adae9f7..d36a260 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java @@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema; +import org.apache.pulsar.client.api.Authentication; /** * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format. @@ -28,8 +29,8 @@ public class PulsarCsvOutputFormat extends BasePulsarOutputForm private static final long serialVersionUID = -4461671510903404196L; - public PulsarCsvOutputFormat(String serviceUrl, String topicName) { - super(serviceUrl, topicName); + public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) { + super(serviceUrl, topicName, authentication); this.serializationSchema = new CsvSerializationSchema<>(); } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java index 3fe5baa..96d7a01 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.batch.connectors.pulsar; import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema; +import org.apache.pulsar.client.api.Authentication; /** * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format. @@ -27,8 +28,8 @@ public class PulsarJsonOutputFormat extends BasePulsarOutputFormat { private static final long serialVersionUID = 8499620770848461958L; - public PulsarJsonOutputFormat(String serviceUrl, String topicName) { - super(serviceUrl, topicName); + public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) { + super(serviceUrl, topicName, authentication); this.serializationSchema = new JsonSerializationSchema(); } } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java index 889970f..393faaf 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java @@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.util.Preconditions; +import org.apache.pulsar.client.api.Authentication; /** * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format. @@ -28,8 +29,8 @@ public class PulsarOutputFormat extends BasePulsarOutputFormat { private static final long serialVersionUID = 2997027580167793000L; - public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema serializationSchema) { - super(serviceUrl, topicName); + public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema serializationSchema) { + super(serviceUrl, topicName, authentication); Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null."); this.serializationSchema = serializationSchema; } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java index c0d3905..55eb619 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,12 @@ public class FlinkPulsarProducer protected final String defaultTopicName; /** + * Pulsar client will use this authentication information, if required. + */ + private final Authentication authentication; + + + /** * (Serializable) SerializationSchema for turning objects used with Flink into. * byte[] for Pulsar. */ @@ -121,13 +128,15 @@ public class FlinkPulsarProducer public FlinkPulsarProducer(String serviceUrl, String defaultTopicName, + Authentication authentication, SerializationSchema serializationSchema, PulsarKeyExtractor keyExtractor) { - this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null); + this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null); } public FlinkPulsarProducer(String serviceUrl, String defaultTopicName, + Authentication authentication, SerializationSchema serializationSchema, PulsarKeyExtractor keyExtractor, Map producerConfig) { @@ -135,6 +144,7 @@ public class FlinkPulsarProducer checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank"); this.serviceUrl = serviceUrl; this.defaultTopicName = defaultTopicName; + this.authentication = authentication; this.schema = checkNotNull(serializationSchema, "Serialization Schema not set"); this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor); ClosureCleaner.ensureSerializable(serializationSchema); @@ -190,7 +200,7 @@ public class FlinkPulsarProducer } private Producer createProducer() throws Exception { - PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); + PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build(); ProducerBuilder producerBuilder = client.newProducer(); if (producerConfig != null) { producerBuilder = producerBuilder.loadConf(producerConfig); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java index b370345..20999fd 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; +import org.apache.pulsar.client.api.Authentication; /** * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format. @@ -44,6 +45,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink { protected final String serviceUrl; protected final String topic; + protected final Authentication authentication; protected final String routingKeyFieldName; protected SerializationSchema serializationSchema; protected String[] fieldNames; @@ -56,16 +58,17 @@ public class PulsarAvroTableSink implements AppendStreamTableSink { * * @param serviceUrl pulsar service url * @param topic topic in pulsar to which table is written - * @param producerConf producer configuration * @param routingKeyFieldName routing key field name */ public PulsarAvroTableSink( String serviceUrl, String topic, + Authentication authentication, String routingKeyFieldName, Class recordClazz) { this.serviceUrl = checkNotNull(serviceUrl, "Service url not set"); this.topic = checkNotNull(topic, "Topic is null"); + this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead"); this.routingKeyFieldName = routingKeyFieldName; this.recordClazz = recordClazz; } @@ -78,6 +81,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink { return new FlinkPulsarProducer( serviceUrl, topic, + authentication, serializationSchema, keyExtractor); } @@ -110,7 +114,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink { @Override public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, routingKeyFieldName, recordClazz); + PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, authentication, routingKeyFieldName, recordClazz); sink.fieldNames = checkNotNull(fieldNames, "Field names are null"); sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null"); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java index 6479bf0..5af82bc 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.Authentication; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,7 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase topicNames; + private final Authentication authentication; private final Pattern topicsPattern; private final String subscriptionName; private final DeserializationSchema deserializer; @@ -75,6 +77,7 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase builder) { super(MessageId.class); this.serviceUrl = builder.serviceUrl; + this.authentication = builder.authentication; this.topicNames = builder.topicNames; this.topicsPattern = builder.topicsPattern; this.deserializer = builder.deserializationSchema; @@ -191,6 +194,7 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase { final DeserializationSchema deserializationSchema; String serviceUrl = SERVICE_URL; final Set topicNames = new TreeSet<>(); + Authentication authentication; Pattern topicsPattern; String subscriptionName = "flink-sub"; long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE; @@ -163,6 +168,62 @@ public class PulsarSourceBuilder { throw new IllegalArgumentException("acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE); } + /** + * Set the authentication provider to use in the Pulsar client instance. + * + * @param authentication an instance of the {@link Authentication} provider already constructed + * @return this builder + */ + public PulsarSourceBuilder authentication(Authentication authentication) { + Preconditions.checkArgument(authentication != null, + "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication"); + this.authentication = authentication; + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance + * + * @param authPluginClassName + * name of the Authentication-Plugin to use + * @param authParamsString + * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2" + * @return this builder + * @throws PulsarClientException.UnsupportedAuthenticationException + * failed to instantiate specified Authentication-Plugin + */ + public PulsarSourceBuilder authentication(String authPluginClassName, String authParamsString) + throws PulsarClientException.UnsupportedAuthenticationException { + Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName), + "Authentication-Plugin class name can not be blank"); + Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString), + "Authentication-Plugin parameters can not be blank"); + this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance + * using a config map. + * + * @param authPluginClassName + * name of the Authentication-Plugin you want to use + * @param authParams + * map which represents parameters for the Authentication-Plugin + * @return this builder + * @throws PulsarClientException.UnsupportedAuthenticationException + * failed to instantiate specified Authentication-Plugin + */ + public PulsarSourceBuilder authentication(String authPluginClassName, Map authParams) + throws PulsarClientException.UnsupportedAuthenticationException { + Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName), + "Authentication-Plugin class name can not be blank"); + Preconditions.checkArgument((authParams != null && authParams.isEmpty() == false), + "parameters to authentication plugin can not be null/empty"); + this.authentication = AuthenticationFactory.create(authPluginClassName, authParams); + return this; + } + public SourceFunction build() { Preconditions.checkNotNull(serviceUrl, "a service url is required"); Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null, diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java index 0fc45f7..5d20a1d 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; +import org.apache.pulsar.client.api.Authentication; /** * An append-only table sink to emit a streaming table as a Pulsar stream. @@ -41,6 +42,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink { protected final String serviceUrl; protected final String topic; + protected Authentication authentication; protected SerializationSchema serializationSchema; protected PulsarKeyExtractor keyExtractor; protected String[] fieldNames; @@ -50,9 +52,11 @@ public abstract class PulsarTableSink implements AppendStreamTableSink { public PulsarTableSink( String serviceUrl, String topic, + Authentication authentication, String routingKeyFieldName) { this.serviceUrl = checkNotNull(serviceUrl, "Service url not set"); this.topic = checkNotNull(topic, "Topic is null"); + this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead"); this.routingKeyFieldName = routingKeyFieldName; } @@ -78,6 +82,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink { return new FlinkPulsarProducer( serviceUrl, topic, + authentication, serializationSchema, keyExtractor); } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java index 62c3b5d..bedcbda 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java @@ -18,6 +18,7 @@ */ package org.apache.flink.batch.connectors.pulsar; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.testng.annotations.Test; import static org.testng.Assert.assertNotNull; @@ -30,28 +31,28 @@ public class PulsarAvroOutputFormatTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() { - new PulsarAvroOutputFormat(null, "testTopic"); + new PulsarAvroOutputFormat(null, "testTopic", new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() { - new PulsarAvroOutputFormat("testServiceUrl", null); + new PulsarAvroOutputFormat("testServiceUrl", null, new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() { - new PulsarAvroOutputFormat("testServiceUrl", " "); + new PulsarAvroOutputFormat("testServiceUrl", " ", new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() { - new PulsarAvroOutputFormat(" ", "testTopic"); + new PulsarAvroOutputFormat(" ", "testTopic", new AuthenticationDisabled()); } @Test public void testPulsarAvroOutputFormatConstructor() { PulsarAvroOutputFormat pulsarAvroOutputFormat = - new PulsarAvroOutputFormat("testServiceUrl", "testTopic"); + new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled()); assertNotNull(pulsarAvroOutputFormat); } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java index a564a89..caccb6b 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java @@ -18,6 +18,7 @@ */ package org.apache.flink.batch.connectors.pulsar; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.testng.annotations.Test; import static org.testng.Assert.assertNotNull; @@ -29,28 +30,28 @@ public class PulsarCsvOutputFormatTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() { - new PulsarCsvOutputFormat(null, "testTopic"); + new PulsarCsvOutputFormat(null, "testTopic", new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() { - new PulsarCsvOutputFormat("testServiceUrl", null); + new PulsarCsvOutputFormat("testServiceUrl", null, new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() { - new PulsarCsvOutputFormat("testServiceUrl", " "); + new PulsarCsvOutputFormat("testServiceUrl", " ", new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() { - new PulsarCsvOutputFormat(" ", "testTopic"); + new PulsarCsvOutputFormat(" ", "testTopic", new AuthenticationDisabled()); } @Test public void testPulsarCsvOutputFormatConstructor() { PulsarCsvOutputFormat pulsarCsvOutputFormat = - new PulsarCsvOutputFormat("testServiceUrl", "testTopic"); + new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled()); assertNotNull(pulsarCsvOutputFormat); } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java index b9953cf..4ab7232 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java @@ -18,6 +18,7 @@ */ package org.apache.flink.batch.connectors.pulsar; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.testng.annotations.Test; import static org.testng.Assert.assertNotNull; @@ -29,28 +30,28 @@ public class PulsarJsonOutputFormatTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() { - new PulsarJsonOutputFormat(null, "testTopic"); + new PulsarJsonOutputFormat(null, "testTopic", new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() { - new PulsarJsonOutputFormat("testServiceUrl", null); + new PulsarJsonOutputFormat("testServiceUrl", null, new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() { - new PulsarJsonOutputFormat("testServiceUrl", " "); + new PulsarJsonOutputFormat("testServiceUrl", " ", new AuthenticationDisabled()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() { - new PulsarJsonOutputFormat(" ", "testTopic"); + new PulsarJsonOutputFormat(" ", "testTopic", new AuthenticationDisabled()); } @Test public void testPulsarJsonOutputFormatConstructor() { PulsarJsonOutputFormat pulsarJsonOutputFormat = - new PulsarJsonOutputFormat("testServiceUrl", "testTopic"); + new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled()); assertNotNull(pulsarJsonOutputFormat); } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java index 41cf8b2..238c49b 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java @@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar; import org.apache.commons.io.IOUtils; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.testng.annotations.Test; import java.io.IOException; @@ -35,34 +36,34 @@ public class PulsarOutputFormatTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() { - new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes()); + new PulsarOutputFormat(null, "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() { - new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes()); + new PulsarOutputFormat("testServiceUrl", null, new AuthenticationDisabled(), text -> text.toString().getBytes()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() { - new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes()); + new PulsarOutputFormat("testServiceUrl", " ", new AuthenticationDisabled(), text -> text.toString().getBytes()); } @Test(expectedExceptions = IllegalArgumentException.class) public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() { - new PulsarOutputFormat(" ", "testTopic", text -> text.toString().getBytes()); + new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes()); } @Test(expectedExceptions = NullPointerException.class) public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() { - new PulsarOutputFormat("testServiceUrl", "testTopic", null); + new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null); } @Test public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException { String input = "Wolfgang Amadeus Mozart"; PulsarOutputFormat pulsarOutputFormat = - new PulsarOutputFormat("testServiceUrl", "testTopic", + new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes()); assertNotNull(pulsarOutputFormat); byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input); @@ -74,7 +75,7 @@ public class PulsarOutputFormatTest { public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException { Employee employee = new Employee(1, "Test Employee", "Test Department"); PulsarOutputFormat pulsarOutputFormat = - new PulsarOutputFormat("testServiceUrl", "testTopic", + new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), new EmployeeSerializationSchema()); assertNotNull(pulsarOutputFormat); diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java index 85bb2ed..125ee4a 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java @@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import org.powermock.api.mockito.PowerMockito; @@ -39,6 +41,7 @@ import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; public class PulsarAvroTableSinkTest { private static final String SERVICE_URL = "pulsar://localhost:6650"; private static final String TOPIC_NAME = "test_topic"; + private static final Authentication AUTHENTICATION = new AuthenticationDisabled(); private static final String ROUTING_KEY = "name"; private final String[] fieldNames = {"id", "name","start_year","end_year"}; @@ -86,13 +89,14 @@ public class PulsarAvroTableSinkTest { private PulsarAvroTableSink spySink() throws Exception { - PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY, NasaMission.class); + PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY, NasaMission.class); FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class); PowerMockito.whenNew( FlinkPulsarProducer.class ).withArguments( Mockito.anyString(), Mockito.anyString(), + Mockito.any(Authentication.class), Mockito.any(SerializationSchema.class), Mockito.any(PulsarKeyExtractor.class) ).thenReturn(producer); diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java index 9ceefff..c42ae6c 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java @@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import org.powermock.api.mockito.PowerMockito; @@ -39,6 +41,7 @@ public class PulsarJsonTableSinkTest { private static final String SERVICE_URL = "pulsar://localhost:6650"; private static final String TOPIC_NAME = "test_topic"; + private static final Authentication AUTHENTICATION = new AuthenticationDisabled(); private static final String ROUTING_KEY = "key"; private final String[] fieldNames = {"key", "value"}; private final TypeInformation[] typeInformations = { @@ -80,13 +83,14 @@ public class PulsarJsonTableSinkTest { } private PulsarJsonTableSink spySink() throws Exception { - PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY); + PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY); FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class); PowerMockito.whenNew( FlinkPulsarProducer.class ).withArguments( Mockito.anyString(), Mockito.anyString(), + Mockito.any(Authentication.class), Mockito.any(SerializationSchema.class), Mockito.any(PulsarKeyExtractor.class) ).thenReturn(producer);