pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Add a running flag for Kafka source connector (#3310)
Date Thu, 24 Jan 2019 01:01:38 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 358d4e7  Add a running flag for Kafka source connector (#3310)
358d4e7 is described below

commit 358d4e75d73314a380202ab0302ede84567215ab
Author: Like <like_0903@163.com>
AuthorDate: Thu Jan 24 09:01:32 2019 +0800

    Add a running flag for Kafka source connector (#3310)
    
    ### Motivation
    
    Currently, if we pass an invalid source configuration to instantiate ```KafkaConsumer```
we will get no log even there is an exception thrown. And after ```close()``` called, the
background thread may still alive unless we caught an ```InterruptedException``` if ```kafkaSourceConfig.isAutoCommitEnabled()```
returns ```false```. So I would like to add a flag to indicate if the connector is stopped.
---
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  34 +++---
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 118 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 14 deletions(-)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index dc3a733..4f80700 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.util.Collections;
 import java.util.Objects;
 import lombok.Getter;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -32,7 +33,6 @@ import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -46,8 +46,8 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class);
 
-    private Consumer<String, byte[]> consumer;
-    private Properties props;
+    private volatile Consumer<String, byte[]> consumer;
+    private volatile boolean running = false;
     private KafkaSourceConfig kafkaSourceConfig;
     private Thread runnerThread;
 
@@ -71,11 +71,10 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
         }
         if (kafkaSourceConfig.getHeartbeatIntervalMs() <= 0) {
             throw new IllegalArgumentException("Invalid Kafka Consumer heartbeatIntervalMs
: "
-                + kafkaSourceConfig.getHeartbeatIntervalMs());
+                    + kafkaSourceConfig.getHeartbeatIntervalMs());
         }
 
-        props = new Properties();
-
+        Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrapServers());
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getGroupId());
         props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, String.valueOf(kafkaSourceConfig.getFetchMinBytes()));
@@ -85,9 +84,13 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
-
+        try {
+            consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex);
+        }
         this.start();
-
+        running = true;
     }
 
     protected Properties beforeCreateConsumer(Properties props) {
@@ -97,12 +100,13 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
     @Override
     public void close() throws InterruptedException {
         LOG.info("Stopping kafka source");
+        running = false;
         if (runnerThread != null) {
             runnerThread.interrupt();
             runnerThread.join();
             runnerThread = null;
         }
-        if(consumer != null) {
+        if (consumer != null) {
             consumer.close();
             consumer = null;
         }
@@ -112,11 +116,10 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
     public void start() {
         runnerThread = new Thread(() -> {
             LOG.info("Starting kafka source");
-            consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
-            consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
+            consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
             ConsumerRecords<String, byte[]> consumerRecords;
-            while(true){
+            while (running) {
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
@@ -131,13 +134,16 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V>
{
                     try {
                         CompletableFuture.allOf(futures).get();
                         consumer.commitSync();
-                    } catch (ExecutionException | InterruptedException ex) {
+                    } catch (InterruptedException ex) {
+                        break;
+                    } catch (ExecutionException ex) {
+                        LOG.error("Error while processing records", ex);
                         break;
                     }
                 }
             }
-
         });
+        runnerThread.setUncaughtExceptionHandler((t, e) -> LOG.error("[{}] Error while
consuming records", t.getName(), e));
         runnerThread.setName("Kafka Source Thread");
         runnerThread.start();
     }
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
new file mode 100644
index 0000000..4bd4b39
--- /dev/null
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.source;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.kafka.KafkaAbstractSource;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+
+public class KafkaAbstractSourceTest {
+
+    private static class DummySource extends KafkaAbstractSource<String> {
+
+        @Override
+        public String extractValue(ConsumerRecord<String, byte[]> record) {
+            return new String(record.value());
+        }
+    }
+
+    @FunctionalInterface
+    public interface ThrowingRunnable {
+        void run() throws Throwable;
+    }
+
+    private static <T extends Exception> void expectThrows(Class<T> expectedType,
String expectedMessage, ThrowingRunnable runnable) {
+        try {
+            runnable.run();
+            Assert.fail();
+        } catch (Throwable e) {
+            if (expectedType.isInstance(e)) {
+                T ex = expectedType.cast(e);
+                assertEquals(expectedMessage, ex.getMessage());
+                return;
+            }
+            throw new AssertionError("Unexpected exception type, expected " + expectedType.getSimpleName()
+ " but got " + e);
+        }
+        throw new AssertionError("Expected exception");
+    }
+
+    @Test
+    public void testInvalidConfigWillThrownException() throws Exception {
+        KafkaAbstractSource source = new DummySource();
+        SourceContext ctx = new SourceContext() {
+            @Override
+            public int getInstanceId() {
+                return 0;
+            }
+
+            @Override
+            public int getNumInstances() {
+                return 0;
+            }
+
+            @Override
+            public void recordMetric(String metricName, double value) {
+
+            }
+        };
+        Map<String, Object> config = new HashMap<>();
+        ThrowingRunnable openAndClose = ()->{
+            try {
+                source.open(config, ctx);
+                fail();
+            } finally {
+                source.close();
+            }
+        };
+        expectThrows(NullPointerException.class, "Kafka topic is not set", openAndClose);
+        config.put("topic", "topic_1");
+        expectThrows(NullPointerException.class, "Kafka bootstrapServers is not set", openAndClose);
+        config.put("bootstrapServers", "localhost:8080");
+        expectThrows(NullPointerException.class, "Kafka consumer group id is not set", openAndClose);
+        config.put("groupId", "test-group");
+        config.put("fetchMinBytes", -1);
+        expectThrows(IllegalArgumentException.class, "Invalid Kafka Consumer fetchMinBytes
: -1", openAndClose);
+        config.put("fetchMinBytes", 1000);
+        config.put("autoCommitEnabled", true);
+        config.put("autoCommitIntervalMs", -1);
+        expectThrows(IllegalArgumentException.class, "Invalid Kafka Consumer autoCommitIntervalMs
: -1", openAndClose);
+        config.put("autoCommitIntervalMs", 100);
+        config.put("sessionTimeoutMs", -1);
+        expectThrows(IllegalArgumentException.class, "Invalid Kafka Consumer sessionTimeoutMs
: -1", openAndClose);
+        config.put("sessionTimeoutMs", 10000);
+        config.put("heartbeatIntervalMs", -100);
+        expectThrows(IllegalArgumentException.class, "Invalid Kafka Consumer heartbeatIntervalMs
: -100", openAndClose);
+        config.put("heartbeatIntervalMs", 20000);
+        expectThrows(IllegalArgumentException.class, "Unable to instantiate Kafka consumer",
openAndClose);
+        config.put("heartbeatIntervalMs", 5000);
+        source.open(config, ctx);
+        source.close();
+    }
+}


Mime
View raw message