Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9D5A200C48 for ; Thu, 6 Apr 2017 23:08:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B85DF160B81; Thu, 6 Apr 2017 21:08:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 541BE160B9F for ; Thu, 6 Apr 2017 23:08:44 +0200 (CEST) Received: (qmail 43002 invoked by uid 500); 6 Apr 2017 21:08:43 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 42826 invoked by uid 99); 6 Apr 2017 21:08:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 21:08:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5F9E3F219F; Thu, 6 Apr 2017 21:08:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Thu, 06 Apr 2017 21:08:45 -0000 Message-Id: <22100354cfb544b99130fe5960c07a3c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/11] flink git commit: [FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are null archived-at: Thu, 06 Apr 2017 21:08:45 -0000 [FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are null This closes #3685. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8890a8db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8890a8db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8890a8db Branch: refs/heads/table-retraction Commit: 8890a8db41c45504aa658a1942f40bb9af7dcf30 Parents: a6355ed Author: zentol Authored: Thu Apr 6 11:55:29 2017 +0200 Committer: zentol Committed: Thu Apr 6 19:35:50 2017 +0200 ---------------------------------------------------------------------- .../kafka/FlinkKafkaConsumerBase.java | 114 +++++++++---------- 1 file changed, 57 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8890a8db/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index d409027..a35e710 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -348,71 +349,70 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti // initialize subscribed partitions List kafkaTopicPartitions = getKafkaPartitions(topics); + Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null."); subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); - if (kafkaTopicPartitions != null) { - if (restoredState != null) { - for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { - if (restoredState.containsKey(kafkaTopicPartition)) { - subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); - } + if (restoredState != null) { + for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { + if (restoredState.containsKey(kafkaTopicPartition)) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); } + } - LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", - getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); - } else { - initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, - kafkaTopicPartitions, - getRuntimeContext().getIndexOfThisSubtask(), - getRuntimeContext().getNumberOfParallelSubtasks(), - startupMode, - specificStartupOffsets); - - if (subscribedPartitionsToStartOffsets.size() != 0) { - switch (startupMode) { - case EARLIEST: - LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", - getRuntimeContext().getIndexOfThisSubtask(), - subscribedPartitionsToStartOffsets.size(), - subscribedPartitionsToStartOffsets.keySet()); - break; - case LATEST: - LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", - getRuntimeContext().getIndexOfThisSubtask(), - subscribedPartitionsToStartOffsets.size(), - subscribedPartitionsToStartOffsets.keySet()); - break; - case SPECIFIC_OFFSETS: - LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", - getRuntimeContext().getIndexOfThisSubtask(), - subscribedPartitionsToStartOffsets.size(), - specificStartupOffsets, - subscribedPartitionsToStartOffsets.keySet()); - - List partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); - for (Map.Entry subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { - if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { - partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); - } + LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", + getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); + } else { + initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + kafkaTopicPartitions, + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks(), + startupMode, + specificStartupOffsets); + + if (subscribedPartitionsToStartOffsets.size() != 0) { + switch (startupMode) { + case EARLIEST: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + break; + case LATEST: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + break; + case SPECIFIC_OFFSETS: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + specificStartupOffsets, + subscribedPartitionsToStartOffsets.keySet()); + + List partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); + for (Map.Entry subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { + if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { + partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); } + } - if (partitionsDefaultedToGroupOffsets.size() > 0) { - LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + - "; their startup offsets will be defaulted to their committed group offsets in Kafka.", - getRuntimeContext().getIndexOfThisSubtask(), - partitionsDefaultedToGroupOffsets.size(), - partitionsDefaultedToGroupOffsets); - } - break; - default: - case GROUP_OFFSETS: - LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", + if (partitionsDefaultedToGroupOffsets.size() > 0) { + LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + + "; their startup offsets will be defaulted to their committed group offsets in Kafka.", getRuntimeContext().getIndexOfThisSubtask(), - subscribedPartitionsToStartOffsets.size(), - subscribedPartitionsToStartOffsets.keySet()); - } + partitionsDefaultedToGroupOffsets.size(), + partitionsDefaultedToGroupOffsets); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); } } }