flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [03/11] flink git commit: [FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are null
Date Thu, 06 Apr 2017 21:08:45 GMT
[FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are null

This closes #3685.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8890a8db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8890a8db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8890a8db

Branch: refs/heads/table-retraction
Commit: 8890a8db41c45504aa658a1942f40bb9af7dcf30
Parents: a6355ed
Author: zentol <chesnay@apache.org>
Authored: Thu Apr 6 11:55:29 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 114 +++++++++----------
 1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8890a8db/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d409027..a35e710 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -348,71 +349,70 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 		// initialize subscribed partitions
 		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+		Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null.");
 
 		subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
 
-		if (kafkaTopicPartitions != null) {
-			if (restoredState != null) {
-				for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
-					if (restoredState.containsKey(kafkaTopicPartition)) {
-						subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
-					}
+		if (restoredState != null) {
+			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
+				if (restoredState.containsKey(kafkaTopicPartition)) {
+					subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
 				}
+			}
 
-				LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored
state: {}",
-					getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets);
-			} else {
-				initializeSubscribedPartitionsToStartOffsets(
-					subscribedPartitionsToStartOffsets,
-					kafkaTopicPartitions,
-					getRuntimeContext().getIndexOfThisSubtask(),
-					getRuntimeContext().getNumberOfParallelSubtasks(),
-					startupMode,
-					specificStartupOffsets);
-
-				if (subscribedPartitionsToStartOffsets.size() != 0) {
-					switch (startupMode) {
-						case EARLIEST:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from
the earliest offsets: {}",
-								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								subscribedPartitionsToStartOffsets.keySet());
-							break;
-						case LATEST:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from
the latest offsets: {}",
-								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								subscribedPartitionsToStartOffsets.keySet());
-							break;
-						case SPECIFIC_OFFSETS:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from
the specified startup offsets {}: {}",
-								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								specificStartupOffsets,
-								subscribedPartitionsToStartOffsets.keySet());
-
-							List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
-							for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet())
{
-								if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET)
{
-									partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
-								}
+			LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored
state: {}",
+				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets);
+		} else {
+			initializeSubscribedPartitionsToStartOffsets(
+				subscribedPartitionsToStartOffsets,
+				kafkaTopicPartitions,
+				getRuntimeContext().getIndexOfThisSubtask(),
+				getRuntimeContext().getNumberOfParallelSubtasks(),
+				startupMode,
+				specificStartupOffsets);
+
+			if (subscribedPartitionsToStartOffsets.size() != 0) {
+				switch (startupMode) {
+					case EARLIEST:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the
earliest offsets: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							subscribedPartitionsToStartOffsets.keySet());
+						break;
+					case LATEST:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the
latest offsets: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							subscribedPartitionsToStartOffsets.keySet());
+						break;
+					case SPECIFIC_OFFSETS:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the
specified startup offsets {}: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							specificStartupOffsets,
+							subscribedPartitionsToStartOffsets.keySet());
+
+						List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+						for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet())
{
+							if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET)
{
+								partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
 							}
+						}
 
-							if (partitionsDefaultedToGroupOffsets.size() > 0) {
-								LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions
in the specified startup offsets: {}" +
-										"; their startup offsets will be defaulted to their committed group offsets in
Kafka.",
-									getRuntimeContext().getIndexOfThisSubtask(),
-									partitionsDefaultedToGroupOffsets.size(),
-									partitionsDefaultedToGroupOffsets);
-							}
-							break;
-						default:
-						case GROUP_OFFSETS:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from
the committed group offsets in Kafka: {}",
+						if (partitionsDefaultedToGroupOffsets.size() > 0) {
+							LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions
in the specified startup offsets: {}" +
+									"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
 								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								subscribedPartitionsToStartOffsets.keySet());
-					}
+								partitionsDefaultedToGroupOffsets.size(),
+								partitionsDefaultedToGroupOffsets);
+						}
+						break;
+					default:
+					case GROUP_OFFSETS:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the
committed group offsets in Kafka: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							subscribedPartitionsToStartOffsets.keySet());
 				}
 			}
 		}


Mime
View raw message