flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Zhen Wu (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7143) Partition assignment for Kafka consumer is not stable
Date Tue, 11 Jul 2017 17:43:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082619#comment-16082619
] 

Steven Zhen Wu commented on FLINK-7143:
---------------------------------------

[~aljoscha] [~tzulitai] agree that {{partitionId % parallelism}} is probably not a good idea
for multiple topics case. what about the old sorting way? Sort the list by (topic, partition)
tuple first. Then do a simple round-robin assignment (mod). Advantage is that it is much easier
to see the assignment pattern (comparing to hashCode). That usually can help with debugging:
easier to figure out expected assignment.

Here is an example of 3 topics and parallelism of 4
{code}
[subtask-id] topic, partition
[0] t1, p0
[1] t1, p1
[2] t1, p2
[3] t2, p0
[0] t3, p0
[1] t3, p1
{code}



> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with
partition assignment for Kafka consumer. some partitions weren't assigned and some partitions
got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
> 	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
>                 ...
> 		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> 			if (i % numParallelSubtasks == indexOfThisSubtask) {
> 				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
> 					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
> 				}
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if the {{kafkaTopicPartitions}}
has different order among different subtasks, assignment is not stable cross subtasks and
creates the assignment issue mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if (kafkaTopicPartitions.get\(i\).getPartition()
% numParallelSubtasks == indexOfThisSubtask)}}. That would result in stable assignment cross
subtasks that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message