incubator-kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bob Jervis <>
Subject Multi-threaded consumer question
Date Tue, 27 Nov 2012 21:50:35 GMT
I am writing an app that needs to distribute incoming messages to multiple topics and have
those topics read by multiple threads per process in the following way:

First, one process writes messages into many different topics (there is a mapping function
that defines which message goes to which topic).

Next, a bank of machines is each running a consumer process.  Each consumer process reads
a subset of the topics.  Each topic gets read by two different machines (for redundancy),
but the exact mix of topics and machines may vary and in principle could be adjusted on the
fly (although in my test all that is happening is that the topics are being spun up one at
a time until the process is happy that it is reading from all the topics it cares about).

I am creating a separate ConsumerConnector for each topic, and then creating a separate KafkaStream
map from each ConsumerConnector, selecting the stream and starting up an iterator.

Do I need to manage a single, shared ConsumerConnector?  My consumer configs defined a groupid
of 'katta-group' but I'm not sure what impact that has.

It appears from the logic that each iterator spins up and drains whatever messages were in
the topic when the iterator started.  My logs show that iterators do even overlap, one iterator
continuing to read messages while a later, different topic iterator starts reading as well.

However, once all topics are caught up and the consumer process quieces (all iterators presumably
waiting for input), if I then run the message generation process to push more data into topics,
I get a burst of log entries in the consumer as follows:

2012-11-27 13:16:06,489 INFO consumer.ZookeeperConsumerConnector -
begin rebalancing consumer
try #0

2012-11-27 13:16:06,507 INFO consumer.Fetcher - Cleared all relevant queues for this fetcher

2012-11-27 13:16:06,507 INFO consumer.ConsumerIterator - Clearing the current data chunk for
this consumer iterator

2012-11-27 13:16:06,507 INFO consumer.Fetcher - Cleared the data chunks in all the consumer
message iterators

2012-11-27 13:16:06,507 INFO consumer.ZookeeperConsumerConnector -
Committing all offsets after clearing the fetcher queues

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector -
Releasing partition ownership

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector -
Consumer rebalancing the following
partitions: List(0-0) for topic v1-farsi-0 with consumers: List(

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - attempting to claim partition

2012-11-27 13:16:06,537 INFO consumer.ZookeeperConsumerConnector - successfully owned partition
0-0 for topic v1-farsi-0

2012-11-27 13:16:06,537 INFO consumer.ZookeeperConsumerConnector -
Updating the cache

2012-11-27 13:16:06,538 INFO consumer.ZookeeperConsumerConnector -
Consumer selected partitions
: v1-farsi-0:0-0: fetched offset = 0: consumed offset = 0

2012-11-27 13:16:06,544 INFO consumer.ZookeeperConsumerConnector -
end rebalancing consumer try

2012-11-27 13:16:06,544 INFO consumer.FetcherRunnable - FetchRunnable-0 start fetching topic:
v1-farsi-0 part: 0 offset: 0 from

But I get no messages read.  It looks like there are about 500 new messages written to a variety
of topics.  If I try this experiment again, pushing another batch of messages, I get the same
burt of rebalancing log entries (but with a different topic from the one listed above).  It
appears that there is at least one message written to the topic that is mentioned in the rebalancing
log entries, but there are no log entries indicating that the iterator for that topic did

Any ideas?
Bob Jervis | Senior Architect

Seattle | Boston | New York | London
Phone: 425.957.6075 | Fax: 781.404.5711

Follow Visibly Intelligent Blog<>

[cid:image004.png@01CDCCA4.CEF68E20] <>

  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message