flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aitozi <gjying1...@gmail.com>
Subject KafkaConsumerBase
Date Wed, 02 Aug 2017 13:51:23 GMT

Hi,

i have a question that , when we use KafkaConsumerBase, we will have to
fetch data from different partition  
 in different parllel thread like the method shown in 
KafkaConsumerBase.java (version 1.2.0)

	protected static List<KafkaTopicPartition> assignPartitions(
			List<KafkaTopicPartition> allPartitions,
			int numConsumers, int consumerIndex) {
		final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
				allPartitions.size() / numConsumers + 1);

		for (int i = 0; i < allPartitions.size(); i++) {
			if (i % numConsumers == consumerIndex) {
				thisSubtaskPartitions.add(allPartitions.get(i));
			}
		}
		
		return thisSubtaskPartitions;
	}

but i have not find any place invoke this method ,  in
KafkaConsumerThread.java it used 

consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));

i think here subscribedPartitions is all the partitions , not
subtaskPartitions.  Can any one address my problem



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message