flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/2] flink git commit: [FLINK-8190] [kafka] Add constructors to expose topic pattern-based subscription
Date Mon, 04 Dec 2017 16:43:29 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.4 928daedfe -> f38d9403e


[FLINK-8190] [kafka] Add constructors to expose topic pattern-based subscription

This closes #5117.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/810be6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/810be6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/810be6dc

Branch: refs/heads/release-1.4
Commit: 810be6dcae3eb5af2d56a61bf6be5dff1cdf149a
Parents: 928daed
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Mon Dec 4 15:05:46 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Dec 5 00:41:51 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    | 55 ++++++++++++++++-
 .../connectors/kafka/FlinkKafkaConsumer010.java | 45 ++++++++++++++
 .../connectors/kafka/FlinkKafkaConsumer011.java | 45 ++++++++++++++
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 62 ++++++++++++++++++-
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 64 +++++++++++++++++++-
 5 files changed, 265 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/810be6dc/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 5376d5b..6c80370 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -294,7 +294,9 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets
to Zookeeper.
 
-### Kafka Consumers Partition Discovery
+### Kafka Consumers Topic and Partition Discovery
+
+#### Partition discovery
 
 The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes
them with
 exactly-once guarantees. All partitions discovered after the initial retrieval of partition
metadata (i.e., when the
@@ -309,6 +311,57 @@ prior to Flink 1.3.x, partition discovery cannot be enabled on the restore
run.
 with an exception. In this case, in order to use partition discovery, please first take a
savepoint in Flink 1.3.x and
 then restore again from that.
 
+#### Topic discovery
+
+At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based
on pattern matching on the
+topic names using regular expressions. See the below for an example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
+    java.util.regex.Pattern.compile("test-topic-[0-9]"),
+    new SimpleStringSchema(),
+    properties);
+
+DataStream<String> stream = env.addSource(myConsumer);
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val properties = new Properties()
+properties.setProperty("bootstrap.servers", "localhost:9092")
+properties.setProperty("group.id", "test")
+
+val myConsumer = new FlinkKafkaConsumer08[String](
+  java.util.regex.Pattern.compile("test-topic-[0-9]"),
+  new SimpleStringSchema,
+  properties)
+
+val stream = env.addSource(myConsumer)
+...
+{% endhighlight %}
+</div>
+</div>
+
+In the above example, all topics with names that match the specified regular expression
+(starting with `test-topic-` and ending with a single digit) will be subscribed by the consumer
+when the job starts running.
+
+To allow the consumer to discover dynamically created topics after the job started running,
+set a non-negative value for `flink.partition-discovery.interval-millis`. This allows
+the consumer to discover partitions of new topics with names that also match the specified
+pattern.
+
 ### Kafka Consumers Offset Committing Behaviour Configuration
 
 The Flink Kafka Consumer allows configuring the behaviour of how offsets

http://git-wip-us.apache.org/repos/asf/flink/blob/810be6dc/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index f569477..6fb63e1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream
from
@@ -126,6 +128,49 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
{
 		super(topics, deserializer, props);
 	}
 
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor
to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's
objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T>
valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer),
props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor
to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading
key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and
Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T>
deserializer, Properties props) {
+		super(subscriptionPattern, deserializer, props);
+	}
+
 	@Override
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,

http://git-wip-us.apache.org/repos/asf/flink/blob/810be6dc/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
index 6f75828..c40463e 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -24,6 +25,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream
from
@@ -110,4 +112,47 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T>
{
 	public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T>
deserializer, Properties props) {
 		super(topics, deserializer, props);
 	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor
to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's
objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T>
valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer),
props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor
to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading
key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and
Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer011(Pattern subscriptionPattern, KeyedDeserializationSchema<T>
deserializer, Properties props) {
+		super(subscriptionPattern, deserializer, props);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/810be6dc/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0a70f61..f362046 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.PropertiesUtil.getLong;
@@ -156,13 +158,67 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>
{
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T>
deserializer, Properties props) {
+		this(topics, null, deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's
objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T>
valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer),
props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading
key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and
Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T>
deserializer, Properties props) {
+		this(null, subscriptionPattern, deserializer, props);
+	}
+
+	private FlinkKafkaConsumer08(
+			List<String> topics,
+			Pattern subscriptionPattern,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties props) {
+
 		super(
 				topics,
-				null,
+				subscriptionPattern,
 				deserializer,
-				getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+				getLong(
+					checkNotNull(props, "props"),
+					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
 
-		this.kafkaProperties = checkNotNull(props, "props");
+		this.kafkaProperties = props;
 
 		// validate the zookeeper properties
 		validateZooKeeperConfig(props);

http://git-wip-us.apache.org/repos/asf/flink/blob/810be6dc/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 65be712..79be73c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -42,6 +43,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.PropertiesUtil.getLong;
@@ -147,9 +149,67 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T>
deserializer, Properties props) {
-		super(topics, null, deserializer, getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
PARTITION_DISCOVERY_DISABLED));
+		this(topics, null, deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's
objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T>
valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer),
props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties),
topics
+	 * with names matching the pattern will also be subscribed to as they are created on the
fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading
key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and
Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper
client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer09(Pattern subscriptionPattern, KeyedDeserializationSchema<T>
deserializer, Properties props) {
+		this(null, subscriptionPattern, deserializer, props);
+	}
+
+	private FlinkKafkaConsumer09(
+			List<String> topics,
+			Pattern subscriptionPattern,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties props) {
+
+		super(
+				topics,
+				subscriptionPattern,
+				deserializer,
+				getLong(
+					checkNotNull(props, "props"),
+					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
 
-		this.properties = checkNotNull(props, "props");
+		this.properties = props;
 		setDeserializer(this.properties);
 
 		// configure the polling timeout


Mime
View raw message