flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: What happened if my parallelism more than kafka partitions.
Date Wed, 08 Nov 2017 10:57:42 GMT
The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method returns the
index of the target subtask for a given Kafka partition.
The implementation in that method ensures that the same subtask index will always be returned
for the same partition.

Each consumer subtask will locally invoke this assignment method for each Kafka partition.
If the returned subtask index doesn’t equal the subtask’s index, that partition will be
filtered out and not be read by the subtask.

On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfighting@foxmail.com) wrote:

The code of kafka partition assign is like follows:  

public static int assign(KafkaTopicPartition partition, int  
numParallelSubtasks) {  
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %  

// here, the assumption is that the id of Kafka partitions are always  
// starting from 0, and therefore can be used directly as the offset  
clockwise from the start index  
return (startIndex + partition.getPartition()) % numParallelSubtasks;  

It seems it will assign to multi sub tasks.  
I wonder how flink ensure some subtasks will simply remain idle  

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/  

View raw message