pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] codelipenghui edited a comment on issue #3131: Consumer stop receive messages from broker
Date Fri, 11 Jan 2019 07:01:30 GMT
codelipenghui edited a comment on issue #3131: Consumer stop receive messages from broker
URL: https://github.com/apache/pulsar/issues/3131#issuecomment-453328896
   ### Broker dispatch process:
   1. In the test, we will
   settings: broker.MaxUnackMessagesPerConsumer=100 and broker.MaxUnackMessagesPerSubscription=500.

   setup 2 consumer in share mode, and set consumer.receiverQueueSize=100.   Producer enable
batching, and set producer.BatchingMaxMessages=2000.
   In `PersistentDispatcherMultipleConsumers`, we have fixed setting : `MaxReadBatchSize`=100,
and `MaxRoundRobinBatchSize`=20;
   2. Once a consumer subscribes to a topic, consumer will send a flow request with 100(receiverQueueSize)
permits to broker to get messages.
   3. By default, broker will read 100(MaxReadBatchSize) entries from ledger and distribute
entries to 2 consumers, each consumer is expecting to get 20(MaxRoundRobinBatchSize) entries.

   4. Once broker send messages to the first consumer, unacked messages will be 40000(20 *
2000), dispatcher will be blocked, so the dispatcher init available permits is 200(2 * consumer.receiverQueueSize),
after send 40000 messages(20 permits) to the first consumer, available permits is 200 - 40000
= -39800. and Dispatcher is set as blocked.
   5. Client Consumer start receiving messages, then after received 50 messsages, it reaches
threshold (receiverQueueSize,100/2), consumer will send Flow request to the broker again,
but now broker.consumer is blocked by un-ack messages, so broker do not send any messages
to consumer, and broker.consumer will increase permitsReceivedWhileConsumerBlocked.
   6. Since messages received but not acked,  consumer will send a redelivery request, dispatcher
should release the flow control, but the logic in the [broker.consumer](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L632),
consumer will use min(permitsReceivedWhileConsumerBlocked, totalRedelivery) to increase the
available permits in the dispatcher.
   I think the problem is here, if totalRedelivery < permitsReceivedWhileConsumerBlocked,
consumer is already send flow request, but broker can't release the flow control in time,
because remaining flowPermits is too small(received_total - permitsReceivedWhileConsumerBlocked)
comparing to permits in broker(-39800) . Dispatcher will be blocked forever.
   When a consumer receive 10000 messages(5 message, batch size is 2000), consumer will send
200 times flow request(50 messages per request), 
   If permitsReceivedWhileConsumerBlocked = 9000, at this time consumer redelivery 2000 messages,
dispatcher will use 2000 to increase the available permits(at this time, available permits
in dispatcher will be -8000). consumer will send the last 1000 messages(<8000) flow request,
 so the dispatcher blocked continue.
   This is the test log:

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message