flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] branch release-1.12 updated: [FLINK-24051][connectors/kafka] Make groupId optional when constructing a KafkaSource
Date Wed, 01 Sep 2021 06:46:39 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 39d834f  [FLINK-24051][connectors/kafka] Make groupId optional when constructing
a KafkaSource
39d834f is described below

commit 39d834f1559dd71cbc289ef2c8aae889f9443899
Author: Fabian Paul <fabianpaul@ververica.com>
AuthorDate: Mon Aug 30 14:09:46 2021 +0200

    [FLINK-24051][connectors/kafka] Make groupId optional when constructing a KafkaSource
    
    Setting a groupdId for the KafkaSource is often not necessary and
    complicates the setup for users that do not rely on specific
    semantics which are implied by the groupId.
---
 .../connector/kafka/source/KafkaSourceBuilder.java | 10 ++---
 .../kafka/source/KafkaSourceBuilderTest.java       | 51 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index 3a7efc9..0099df3 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -52,14 +52,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * KafkaSource<String> source = KafkaSource
  *     .<String>builder()
  *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
- *     .setGroupId("myGroup")
  *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
  *     .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
  *     .build();
  * }</pre>
  *
- * <p>The bootstrap servers, group id, topics/partitions to consume, and the record
deserializer are
- * required fields that must be set.
+ * <p>The bootstrap servers, topics/partitions to consume, and the record deserializer
are required
+ * fields that must be set.
  *
  * <p>To specify the starting offsets of the KafkaSource, one can call {@link
  * #setStartingOffsets(OffsetsInitializer)}.
@@ -74,7 +73,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * KafkaSource<String> source = KafkaSource
  *     .<String>builder()
  *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
- *     .setGroupId("myGroup")
  *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
  *     .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
  *     .setUnbounded(OffsetsInitializer.latest())
@@ -86,9 +84,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class KafkaSourceBuilder<OUT> {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class);
-    private static final String[] REQUIRED_CONFIGS = {
-        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG
-    };
+    private static final String[] REQUIRED_CONFIGS = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG};
     // The subscriber specifies the partitions to subscribe to.
     private KafkaSubscriber subscriber;
     // Users can specify the starting / stopping offset initializer.
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
new file mode 100644
index 0000000..e83a3d3
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
+/** Tests for {@link KafkaSourceBuilder}. */
+public class KafkaSourceBuilderTest extends TestLogger {
+
+    @Test
+    public void testBuildSourceWithoutGroupId() {
+        new KafkaSourceBuilder<String>()
+                .setBootstrapServers("testServer")
+                .setTopics("topic")
+                .setDeserializer(
+                        new KafkaRecordDeserializer<String>() {
+                            @Override
+                            public TypeInformation<String> getProducedType() {
+                                return null;
+                            }
+
+                            @Override
+                            public void deserialize(
+                                    ConsumerRecord<byte[], byte[]> record,
+                                    Collector<String> collector)
+                                    throws Exception {}
+                        })
+                .build();
+    }
+}

Mime
View raw message