pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [Pulsar-Flink] Extends Validations (#3063)
Date Tue, 27 Nov 2018 02:30:35 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5438659  [Pulsar-Flink] Extends Validations (#3063)
5438659 is described below

commit 5438659ae6b8b911ed1c85b0645c92229a85b472
Author: Eren Avsarogullari <erenavsarogullari@gmail.com>
AuthorDate: Tue Nov 27 02:30:30 2018 +0000

    [Pulsar-Flink] Extends Validations (#3063)
    
    ### Motivation
    This PR aims to extend validations on Flink Connector.
    
    ### Modifications
    1- `FlinkPulsarProducer` constructor needs to be robust for **blank** values.
    2- `PulsarSourceBuilder` needs to be robust for **blank** values.
    3- `totalMessageCount` variable looks redundant so can be removed on `PulsarConsumerSource`
    4- `PulsarSourceBuilder` UT coverage is added.
    
    ### Test Coverage
    New UT coverage is added.
---
 .../connectors/pulsar/FlinkPulsarProducer.java     |   8 +-
 .../connectors/pulsar/PulsarConsumerSource.java    |   3 -
 .../connectors/pulsar/PulsarSourceBuilder.java     |  16 ++--
 .../connectors/pulsar/PulsarSourceBuilderTest.java | 104 +++++++++++++++++++++
 4 files changed, 119 insertions(+), 12 deletions(-)

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 48bc0f1..bced297 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.function.Function;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -121,8 +123,10 @@ public class FlinkPulsarProducer<IN>
                                SerializationSchema<IN> serializationSchema,
                                ProducerConfiguration producerConfig,
                                PulsarKeyExtractor<IN> keyExtractor) {
-        this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
-        this.defaultTopicName = checkNotNull(defaultTopicName, "TopicName not set");
+        checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
+        checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
+        this.serviceUrl = serviceUrl;
+        this.defaultTopicName = defaultTopicName;
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
         this.producerConfig = checkNotNull(producerConfig, "Producer Config is not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 0d01def..84e0e50 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -66,7 +66,6 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T,
MessageI
 
     private final long acknowledgementBatchSize;
     private long batchCount;
-    private long totalMessageCount;
 
     private transient volatile boolean isRunning;
 
@@ -147,14 +146,12 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T,
MessageI
                 return;
             }
             context.collect(deserialize(message));
-            totalMessageCount++;
         }
     }
 
     private void emitAutoAcking(SourceContext<T> context, Message message) throws IOException
{
         context.collect(deserialize(message));
         batchCount++;
-        totalMessageCount++;
         if (batchCount >= acknowledgementBatchSize) {
             LOG.info("processed {} messages acknowledging messageId {}", batchCount, message.getMessageId());
             consumer.acknowledgeCumulative(message.getMessageId());
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3396983..9605f07 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -29,9 +30,9 @@ import org.apache.flink.util.Preconditions;
 @PublicEvolving
 public class PulsarSourceBuilder<T> {
 
-    static final String SERVICE_URL = "pulsar://localhost:6650";
-    static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
-    static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
+    private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
 
     final DeserializationSchema<T> deserializationSchema;
     String serviceUrl = SERVICE_URL;
@@ -50,7 +51,7 @@ public class PulsarSourceBuilder<T> {
      * @return this builder
      */
     public PulsarSourceBuilder<T> serviceUrl(String serviceUrl) {
-        Preconditions.checkNotNull(serviceUrl);
+        Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot
be blank");
         this.serviceUrl = serviceUrl;
         return this;
     }
@@ -66,7 +67,7 @@ public class PulsarSourceBuilder<T> {
      * @return this builder
      */
     public PulsarSourceBuilder<T> topic(String topic) {
-        Preconditions.checkNotNull(topic);
+        Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
         this.topic = topic;
         return this;
     }
@@ -78,7 +79,8 @@ public class PulsarSourceBuilder<T> {
      * @return this builder
      */
     public PulsarSourceBuilder<T> subscriptionName(String subscriptionName) {
-        Preconditions.checkNotNull(subscriptionName);
+        Preconditions.checkArgument(StringUtils.isNotBlank(subscriptionName),
+                "subscriptionName cannot be blank");
         this.subscriptionName = subscriptionName;
         return this;
     }
@@ -112,7 +114,7 @@ public class PulsarSourceBuilder<T> {
      * @return a builder
      */
     public static <T> PulsarSourceBuilder<T> builder(DeserializationSchema<T>
deserializationSchema) {
-        Preconditions.checkNotNull(deserializationSchema);
+        Preconditions.checkNotNull(deserializationSchema, "deserializationSchema cannot be
null");
         return new PulsarSourceBuilder<>(deserializationSchema);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
new file mode 100644
index 0000000..5a916e8
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests for PulsarSourceBuilder
+ */
+public class PulsarSourceBuilderTest {
+
+    private PulsarSourceBuilder pulsarSourceBuilder;
+
+    @Before
+    public void before() {
+        pulsarSourceBuilder = PulsarSourceBuilder.builder(new TestDeserializationSchema());
+    }
+
+    @Test
+    public void testBuild() {
+        SourceFunction sourceFunction = pulsarSourceBuilder
+                .serviceUrl("testServiceUrl")
+                .topic("testTopic")
+                .subscriptionName("testSubscriptionName")
+                .build();
+        Assert.assertNotNull(sourceFunction);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testBuildWithoutSettingRequiredProperties() {
+        pulsarSourceBuilder.build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testServiceUrlWithNull() {
+        pulsarSourceBuilder.serviceUrl(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testServiceUrlWithBlank() {
+        pulsarSourceBuilder.serviceUrl(" ");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicWithNull() {
+        pulsarSourceBuilder.topic(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicWithBlank() {
+        pulsarSourceBuilder.topic(" ");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSubscriptionNameWithNull() {
+        pulsarSourceBuilder.subscriptionName(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSubscriptionNameWithBlank() {
+        pulsarSourceBuilder.subscriptionName(" ");
+    }
+
+    private class TestDeserializationSchema<T> implements DeserializationSchema<T>
{
+
+        @Override
+        public T deserialize(byte[] bytes) throws IOException {
+            return null;
+        }
+
+        @Override
+        public boolean isEndOfStream(T t) {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<T> getProducedType() {
+            return null;
+        }
+    }
+}


Mime
View raw message