flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7143) Partition assignment for Kafka consumer is not stable
Date Fri, 14 Jul 2017 12:12:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16087236#comment-16087236

ASF GitHub Bot commented on FLINK-7143:

Github user tzulitai commented on the issue:

    @aljoscha on some second thinking, I don't think we can easily deal with the fact that,
when restoring from 1.3.1 / 1.3.0 savepoints in 1.3.2, users will not benefit from this bug
    There is basically no guarantee in what the distribution would be when restoring from
1.3.1 / 1.3.0, and therefore no way to manipulate it to follow the new fixed distribution
scheme we introduce here.
    It may be possible if we force a migrate to union list state in 1.3.2, but I'm not really
sure that we want to do that ..

> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with
partition assignment for Kafka consumer. some partitions weren't assigned and some partitions
got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
> 	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
>                 ...
> 		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> 			if (i % numParallelSubtasks == indexOfThisSubtask) {
> 				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
> 					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
> 				}
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if the {{kafkaTopicPartitions}}
has different order among different subtasks, assignment is not stable cross subtasks and
creates the assignment issue mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if (kafkaTopicPartitions.get\(i\).getPartition()
% numParallelSubtasks == indexOfThisSubtask)}}. That would result in stable assignment cross
subtasks that is independent of ordering in the array.
> marking it as blocker because of its impact.

This message was sent by Atlassian JIRA

View raw message