activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wei Wei (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AMQ-4052) When consumers was killed and restarted frequently, some messages could not be consumed by consumer but they were still in pending messages.
Date Mon, 17 Sep 2012 03:33:07 GMT

     [ https://issues.apache.org/jira/browse/AMQ-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Wei Wei updated AMQ-4052:
-------------------------

    Description: 
The phenomenon of the bug:
When consumers was killed and restarted frequently, some messages could not be consumed by
consumer but they were still in pending messages.

Reason:
When consumer consumes messages in transaction, it sends ack command and transaction commit
command separately, if the consumer was killed between ack command and transaction commit
command, in other words the Broker only received the ack command but did not receive the transaction
commit command, this lead to the message in PrefetchSubscription’s dispatched list was set
ack true, but was not removed from dispatched list. When the consumer was killed, Queue#removeSubscription()
was called, in this method, PrefetchSubscription#remove() was called to put PrefetchSubscription’s
pending list and dispatched list to Queue’s redeliveredWaitingDispatch list. After that,
when Queue#doDispatch() was called, the redeliveredWaitingDispatch list was transferred to
Queue#doActualDispatch () method to dispatch the messages to consumer again. In Queue#doActualDispatch
() method, if all consumers isFull() method returns true, the target is null and the expression
“interestCount > 0” become the only condition to judge if the message should be put
back to the  redeliveredWaitingDispatch, but now the message’s isAcked() method returns
true, this lead to interestCount is 0, and then the message was not put back to redeliveredWaitingDispatch,
but it was not consumed and was still in pagedInMessages. 

Solution:
In PrefetchSubscription#remove() method, set ack status to false for all messages in dispatched
list. The code as follows:

    public List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
        List<MessageReference> resultList = new ArrayList<MessageReference>();
        synchronized(pendingLock) {
            super.remove(context, destination);
            // Here is a potential problem concerning Inflight stat:
            // Messages not already committed or rolled back may not be removed from dispatched
list at the moment
            // Except if each commit or rollback callback action comes before remove of subscriber.
            resultList.addAll(pending.remove(context, destination));

            // Synchronized to DispatchLock
            synchronized(dispatchLock) {
                ArrayList<MessageReference> references = new ArrayList<MessageReference>();
                for (MessageReference msgRef : dispatched) {
                    if( msgRef.getRegionDestination() == destination) {
                        references.add(msgRef);
                        if (msgRef instanceof QueueMessageReference) {
                        	QueueMessageReference ref = (QueueMessageReference) msgRef;
                        	ref.setAcked(false);
                        }
                    }
                }
                resultList.addAll(references);
                destination.getDestinationStatistics().getDispatched().subtract(references.size());
                destination.getDestinationStatistics().getInflight().subtract(references.size());
                dispatched.removeAll(references);
            }
        }
        
        return resultList;
    }


The solution can solve the problem, I want to know if the solution can lead to other problems.

  was:
The phenomenon of the bug:
When consumers was killed and restarted frequently, some messages could not be consumed by
consumer but they were still in pending messages.

Reason:
When consumer consumes messages in transaction, it sends ack command and transaction commit
command separately, if the consumer was killed between ack command and transaction commit
command, in other words the Broker only received the ack command but did not receive the transaction
commit command, this lead to the message in PrefetchSubscription’s dispatched list was set
ack true, but was not removed from dispatched list. When the consumer was killed, Queue#removeSubscription()
was called, in this method, PrefetchSubscription#remove() was called to put PrefetchSubscription’s
pending list and dispatched list to Queue’s redeliveredWaitingDispatch list. After that,
when Queue#doDispatch() was called, the redeliveredWaitingDispatch list was transferred to
Queue#doActualDispatch () method to dispatch the messages to consumer again. In Queue#doActualDispatch
() method, if all consumers isFull() method returns true, the target is null and the expression
“interestCount > 0” become the only condition to judge if the message should be put
back to the  redeliveredWaitingDispatch, but now the message’s isAcked() method returns
true, this lead to interestCount is 0, and then the message was not put back to redeliveredWaitingDispatch,
but it was not consumed and was still in pagedInMessages. 

Solution:
In PrefetchSubscription#remove() method, set ack status to false for all messages in dispatched
list. The code as follows:

    public List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
        List<MessageReference> resultList = new ArrayList<MessageReference>();
        synchronized(pendingLock) {
            super.remove(context, destination);
            // Here is a potential problem concerning Inflight stat:
            // Messages not already committed or rolled back may not be removed from dispatched
list at the moment
            // Except if each commit or rollback callback action comes before remove of subscriber.
            resultList.addAll(pending.remove(context, destination));

            // Synchronized to DispatchLock
            synchronized(dispatchLock) {
                ArrayList<MessageReference> references = new ArrayList<MessageReference>();
                for (MessageReference msgRef : dispatched) {
                    if( msgRef.getRegionDestination() == destination) {
                        references.add(msgRef);
                        if (msgRef instanceof QueueMessageReference) {
                        	QueueMessageReference ref = (QueueMessageReference) msgRef;
                        	ref.setAcked(false);
                        }
                    }
                }
                resultList.addAll(references);
                destination.getDestinationStatistics().getDispatched().subtract(references.size());
                destination.getDestinationStatistics().getInflight().subtract(references.size());
                dispatched.removeAll(references);
            }
        }
        
        return resultList;
    }



    
> When consumers was killed and restarted frequently, some messages could not be consumed
by consumer but they were still in pending messages.
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4052
>                 URL: https://issues.apache.org/jira/browse/AMQ-4052
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.6.0
>         Environment: CentOS release 5.6 (Final) java version "1.6.0_30"
>            Reporter: Wei Wei
>
> The phenomenon of the bug:
> When consumers was killed and restarted frequently, some messages could not be consumed
by consumer but they were still in pending messages.
> Reason:
> When consumer consumes messages in transaction, it sends ack command and transaction
commit command separately, if the consumer was killed between ack command and transaction
commit command, in other words the Broker only received the ack command but did not receive
the transaction commit command, this lead to the message in PrefetchSubscription’s dispatched
list was set ack true, but was not removed from dispatched list. When the consumer was killed,
Queue#removeSubscription() was called, in this method, PrefetchSubscription#remove() was called
to put PrefetchSubscription’s pending list and dispatched list to Queue’s redeliveredWaitingDispatch
list. After that, when Queue#doDispatch() was called, the redeliveredWaitingDispatch list
was transferred to Queue#doActualDispatch () method to dispatch the messages to consumer again.
In Queue#doActualDispatch () method, if all consumers isFull() method returns true, the target
is null and the expression “interestCount > 0” become the only condition to judge if
the message should be put back to the  redeliveredWaitingDispatch, but now the message’s
isAcked() method returns true, this lead to interestCount is 0, and then the message was not
put back to redeliveredWaitingDispatch, but it was not consumed and was still in pagedInMessages.

> Solution:
> In PrefetchSubscription#remove() method, set ack status to false for all messages in
dispatched list. The code as follows:
>     public List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception {
>         List<MessageReference> resultList = new ArrayList<MessageReference>();
>         synchronized(pendingLock) {
>             super.remove(context, destination);
>             // Here is a potential problem concerning Inflight stat:
>             // Messages not already committed or rolled back may not be removed from
dispatched list at the moment
>             // Except if each commit or rollback callback action comes before remove
of subscriber.
>             resultList.addAll(pending.remove(context, destination));
>             // Synchronized to DispatchLock
>             synchronized(dispatchLock) {
>                 ArrayList<MessageReference> references = new ArrayList<MessageReference>();
>                 for (MessageReference msgRef : dispatched) {
>                     if( msgRef.getRegionDestination() == destination) {
>                         references.add(msgRef);
>                         if (msgRef instanceof QueueMessageReference) {
>                         	QueueMessageReference ref = (QueueMessageReference) msgRef;
>                         	ref.setAcked(false);
>                         }
>                     }
>                 }
>                 resultList.addAll(references);
>                 destination.getDestinationStatistics().getDispatched().subtract(references.size());
>                 destination.getDestinationStatistics().getInflight().subtract(references.size());
>                 dispatched.removeAll(references);
>             }
>         }
>         
>         return resultList;
>     }
> The solution can solve the problem, I want to know if the solution can lead to other
problems.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message