flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "r. r." <rob...@abv.bg>
Subject Re: all task managers reading from all kafka partitions
Date Fri, 17 Nov 2017 18:54:43 GMT
Hi 



it's Flink 1.3.2, Kafka 0.10.2.0

I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4 
times (via ./flink run -p1 x.jar), job parallelism is set to 1.



A new thing I just noticed: if I start in parallel to the Flink jobs two
 kafka-console-consumer (with --consumer-property 
group.id=TopicConsumers) and write a msg to Kafka, then one of the 
console consumers receives the msg together with both Flink jobs. 

I though maybe the Flink consumers didn't receive the group property 
passed via "flink run .. --group.id TopicConsumers", but no - they do 
belong to the group as well:



taskmanager_3  | 2017-11-17 18:29:00,750 INFO  
org.apache.kafka.clients.consumer.ConsumerConfig              - 
ConsumerConfig values: 

taskmanager_3  |     auto.commit.interval.ms = 5000

taskmanager_3  |     auto.offset.reset = latest

taskmanager_3  |     bootstrap.servers = [kafka:9092]

taskmanager_3  |     check.crcs = true

taskmanager_3  |     client.id = 

taskmanager_3  |     connections.max.idle.ms = 540000

taskmanager_3  |     enable.auto.commit = true

taskmanager_3  |     exclude.internal.topics = true

taskmanager_3  |     fetch.max.bytes = 52428800

taskmanager_3  |     fetch.max.wait.ms = 500

taskmanager_3  |     fetch.min.bytes = 1

taskmanager_3  |     group.id = TopicConsumers

taskmanager_3  |     heartbeat.interval.ms = 3000

taskmanager_3  |     interceptor.classes = null

taskmanager_3  |     key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

taskmanager_3  |     max.partition.fetch.bytes = 1048576

taskmanager_3  |     max.poll.interval.ms = 300000

taskmanager_3  |     max.poll.records = 500

taskmanager_3  |     metadata.max.age.ms = 300000

taskmanager_3  |     metric.reporters = []

taskmanager_3  |     metrics.num.samples = 2

taskmanager_3  |     metrics.recording.level = INFO

taskmanager_3  |     metrics.sample.window.ms = 30000

taskmanager_3  |     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

taskmanager_3  |     receive.buffer.bytes = 65536

taskmanager_3  |     reconnect.backoff.ms = 50

taskmanager_3  |     request.timeout.ms = 305000

taskmanager_3  |     retry.backoff.ms = 100

taskmanager_3  |     sasl.jaas.config = null

taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit

taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 60000

taskmanager_3  |     sasl.kerberos.service.name = null

taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05

taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 0.8

taskmanager_3  |     sasl.mechanism = GSSAPI

taskmanager_3  |     security.protocol = PLAINTEXT

taskmanager_3  |     send.buffer.bytes = 131072

taskmanager_3  |     session.timeout.ms = 10000

taskmanager_3  |     ssl.cipher.suites = null

taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

taskmanager_3  |     ssl.endpoint.identification.algorithm = null

taskmanager_3  |     ssl.key.password = null

taskmanager_3  |     ssl.keymanager.algorithm = SunX509

taskmanager_3  |     ssl.keystore.location = null

taskmanager_3  |     ssl.keystore.password = null

taskmanager_3  |     ssl.keystore.type = JKS

taskmanager_3  |     ssl.protocol = TLS

taskmanager_3  |     ssl.provider = null

taskmanager_3  |     ssl.secure.random.implementation = null

taskmanager_3  |     ssl.trustmanager.algorithm = PKIX

taskmanager_3  |     ssl.truststore.location = null

taskmanager_3  |     ssl.truststore.password = null

taskmanager_3  |     ssl.truststore.type = JKS

taskmanager_3  |     value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

taskmanager_3  | 

taskmanager_3  | 2017-11-17 18:29:00,765 WARN  
org.apache.kafka.clients.consumer.ConsumerConfig              - The 
configuration 'topic' was supplied but isn't a known config.

taskmanager_3  | 2017-11-17 18:29:00,765 INFO  
org.apache.kafka.common.utils.AppInfoParser                   - Kafka 
version : 0.10.2.1

taskmanager_3  | 2017-11-17 18:29:00,770 INFO  
org.apache.kafka.common.utils.AppInfoParser                   - Kafka 
commitId : e89bffd6b2eff799

taskmanager_3  | 2017-11-17 18:29:00,791 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group 
TopicConsumers.





I'm running Kafka and Flink jobs in docker containers, the console-consumers from localhost







 >-------- Оригинално писмо --------

 >От: Gary Yao gary@data-artisans.com

 >Относно: Re: all task managers reading from all kafka partitions

 >До: "r. r." <robert@abv.bg>

 >Изпратено на: 17.11.2017 20:02



 
> 
 
>  
 
>  
 
>   
 
>    
 
>     
 
>      Hi Robert,
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Can you tell us which Flink version you are using? 
 
>     
 
>     
 
>      Also, are you starting a single job with parallelism 4 or are you starting several
jobs?
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Thanks!
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Gary
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      On Fri, Nov 17, 2017 at 4:41 PM, r. r. 
 
>      <robert@abv.bg> wrote:
 
>      
 
>      
 
>       Hi
 
>        
 
>        I have this strange problem: 4 task managers each with one task slot, attaching
to the same Kafka topic which has 10 partitions.
 
>        
 
>        When I post a single message to the Kafka topic it seems that all 4 consumers
fetch the message and start processing (confirmed by TM logs).
 
>        
 
>        If I run kafka-consumer-groups.sh  --describe --group TopicConsumers it says
that only one message was posted to a single partition. Next message would generally go to
another partition.
 
>        
 
>        In addition, while the Flink jobs are running on the message, I start two kafka-console-consumer.sh
and each would get only one message, as expected.
 
>        
 
>        On start each of the Flink TM would post something that to me reads as if it would
read from all partitions:
 
>        
 
>        2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 
- Got 10 partitions from these topics: [TopicToConsume]
 
>        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 
- Consumer is going to read the following topics (with number of partitions): TopicToConsume
(10),
 
>        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase 
- Consumer subtask 0 will start reading the following 10 partitions from the committed group
offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8}, KafkaTopicPartition{topic='TopicToConsume',
partition=9}, KafkaTopicPartition{topic='TopicToConsume', partition=6}, KafkaTopicPartition{topic='TopicToConsume',
partition=7}, KafkaTopicPartition{topic='TopicToConsume', partition=4}, KafkaTopicPartition{topic='TopicToConsume',
partition=5}, KafkaTopicPartition{topic='TopicToConsume', partition=2}, KafkaTopicPartition{topic='TopicToConsume',
partition=3}, KafkaTopicPartition{topic='TopicToConsume', partition=0}, KafkaTopicPartition{topic='TopicToConsume',
partition=1}]
 
>        2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consumer.ConsumerConfig             
- ConsumerConfig values:
 
>                
 
>       auto.commit.interval.ms = 5000
 
>                auto.offset.reset = latest
 
>        
 
>        
 
>        
 
>        Any hints?
 
>        
 
>        
 
>        
 
>      
 
>     
 
>     
 
>    
 
>    
 
>  

Mime
View raw message