www-infrastructure-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (INFRA-6636) BrokerProxy deadlocks if messages aren't polled from all streams
Date Tue, 06 Aug 2013 03:09:47 GMT

     [ https://issues.apache.org/jira/browse/INFRA-6636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Chris Riccomini closed INFRA-6636.

    Resolution: Fixed

Wrong project! My bad! :)
> BrokerProxy deadlocks if messages aren't polled from all streams
> ----------------------------------------------------------------
>                 Key: INFRA-6636
>                 URL: https://issues.apache.org/jira/browse/INFRA-6636
>             Project: Infrastructure
>          Issue Type: Bug
>      Security Level: public(Regular issues) 
>            Reporter: Chris Riccomini
> Suppose a KafkaSystemConsumer is created with:
> {code}
> consumer.register(sp1, 0)
> consumer.register(sp2, 0)
> consumer.start
> while(true) {
>   consumer.poll(sp2, 0)
> }
> {code}
> This code will eventually dead lock (assuming sp1 has messages) if sp1 and sp2 are both
on the same broker.
> The current implementation of BrokerProxy/KafkaSystemConsumer puts messages into BlockingEnvelopeMap.
The BlockingEnvelopeMap has a per-SystemStreamPartition max queueSize (defaults to 1000, I
believe). This means that, if a SystemStreamPartition is registered with the KafkaSystemConsumer,
but messages are not read off of the SystemStreamPartition's queue for some reason, the BrokerProxy/KafkaSystemConsumer
will eventually block on BlockingEnvelopeMap.add. This will prevent the BrokerProxy from fetching
any more messages from ANY topic/partitions on the broker. If code is trying to read messages
from another SystemStreamPartition, it will not ever receive any new messages.
> This is not currently a problem because Samza reads in messages in two ways:
> 1) During state store restore.
> 2) During process loop (feeding messages to StreamTask.process).
> The current SamzaContainer implementation uses a new SystemConsumer for each SystemStreamPartition
when it restores (#1), which registers ONLY one SystemStreamPartition, so no deadlock is possible
here. The current DefaultChooser round robins between streams, which means that you will always
poll from all streams with messages available in a round-robin fashion, so no starvation is
currently possible (which means that no deadlock is possible).
> Nevertheless, this should be fixed. For one thing, if we change the DefaultChooser's
behavior, this problem would surface.
> The simplest solution would be to stop fetching messages in the BrokerProxy for queues
that are full. An alternative would be to stop fetching messages for any queue that has messages
already in it (regardless of whether it's "full" or not).
> One nuance to the stop-fetching-on-queue-full solution is that FetchRequest takes a fetchSize,
which is in bytes. This means that we might get more messages back in one FetchRequest than
would fit into the BlockingEnvelopeMap queue. We could drop these excess messages, and re-fetch
them again later.
> I think the best solution is just:
> 1) Stop fetching messages for any queue that's not empty.
> 2) Make KafkaSystemConsumer override newBlockingQueue with an unbounded LinkedBlockingQueue.
> The new memory semantics for the KafkaSystemConsumer would be that the LinkedBlockingQueue
would hold up to N elements where N is the max number of elements that can theoretically be
returned in a single FetchRequest for a given TopicAndPartition. Thus, if a KafkaSystemConsumer's
fetchSize were 1 megabyte, and 1 meg could return a theoretical maximum of 1 million messages
(1 byte per message), then the maximum number of messages you'd expect to see in any single
unbounded LinkedBlockingQueue would be 1 million. Once this queue was drained to zero, a new
fetch would be issued.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

View raw message