activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuriy Sidelnikov (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
Date Wed, 07 Nov 2012 09:07:14 GMT

     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Comment: was deleted

(was: To fix the issue with a wrong message order the following patch can be applied.This
patch synchronizes the message processing and remove an excess syncs inside sync block.
Index: C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
===================================================================
--- C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

@@ -93,6 +93,7 @@
             return;
         }
         enqueueCounter.incrementAndGet();
+        synchronized (matchedListMutex) {
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -112,7 +113,6 @@
             if (maximumPendingMessages != 0) {
                 boolean warnedAboutWait = false;
                 while (active) {
 -                    synchronized (matchedListMutex) {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn(toString() + ": stopped waiting for space in pendingMessage
cursor for: "
@@ -136,10 +136,7 @@
                         if (matched.tryAddMessageLast(node, 10)) {
                             break;
                         }
 -                    }
                 }
 -                synchronized (matchedListMutex) {
 -                    
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
@@ -159,13 +156,11 @@
                             pageInSize = Math.max(1000, pageInSize);
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
 -                            synchronized(matched){
                                 list = matched.pageInList(pageInSize);
                             	oldMessages = messageEvictionStrategy.evictMessages(list);
                             	for (MessageReference ref : list) {
                             	    ref.decrementReferenceCount();
                             	}
 -                            }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
 	                            messagesToEvict = oldMessages.length;
@@ -183,10 +178,10 @@
                             }
                         }
                     }
 -                }
 -                dispatchMatched();
+                 dispatchMatched();
             }
         }
+         }
     }
 
     private boolean isDuplicate(MessageReference node) {
)
    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer
is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color}
blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  &&
!isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will
go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will
be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately
because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some
previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class
and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.   

> 4. Second message will be dispatched immediately and might be first to be delivered.

> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message