activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-4107 - apply patch - resolved intermittent failure of TwoMulticastDiscoveryBrokerTopicSendReceiveTest
Date Mon, 12 Jan 2015 12:01:13 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 3e007d89a -> 8dbb48a23


https://issues.apache.org/jira/browse/AMQ-4107 - apply patch - resolved intermittent failure
of TwoMulticastDiscoveryBrokerTopicSendReceiveTest


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8dbb48a2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8dbb48a2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8dbb48a2

Branch: refs/heads/trunk
Commit: 8dbb48a23fc6170afd5c3c855e8d385297042e57
Parents: 3e007d8
Author: gtully <gary.tully@gmail.com>
Authored: Mon Jan 12 11:59:51 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Jan 12 12:00:42 2015 +0000

----------------------------------------------------------------------
 .../broker/region/TopicSubscription.java        | 41 +++++++++-----------
 ...castDiscoveryBrokerTopicSendReceiveTest.java |  2 +-
 2 files changed, 20 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8dbb48a2/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 6b61379..8db7c62 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -106,26 +106,26 @@ public class TopicSubscription extends AbstractSubscription {
         // locator /w the message.
         node = new IndirectMessageReference(node.getMessage());
         enqueueCounter.incrementAndGet();
-        if (!isFull() && matched.isEmpty()) {
-            // if maximumPendingMessages is set we will only discard messages which
-            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
-            dispatch(node);
-            setSlowConsumer(false);
-        } else {
-            if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize())
{
-                // Slow consumers should log and set their state as such.
-                if (!isSlowConsumer()) {
-                    LOG.warn("{}: has twice its prefetch limit pending, without an ack; it
appears to be slow", toString());
-                    setSlowConsumer(true);
-                    for (Destination dest: destinations) {
-                        dest.slowConsumer(getContext(), this);
+        synchronized (matchedListMutex) {
+            if (!isFull() && matched.isEmpty()) {
+                // if maximumPendingMessages is set we will only discard messages which
+                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
+                dispatch(node);
+                setSlowConsumer(false);
+            } else {
+                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize())
{
+                    // Slow consumers should log and set their state as such.
+                    if (!isSlowConsumer()) {
+                        LOG.warn("{}: has twice its prefetch limit pending, without an ack;
it appears to be slow", toString());
+                        setSlowConsumer(true);
+                        for (Destination dest: destinations) {
+                            dest.slowConsumer(getContext(), this);
+                        }
                     }
                 }
-            }
-            if (maximumPendingMessages != 0) {
-                boolean warnedAboutWait = false;
-                while (active) {
-                    synchronized (matchedListMutex) {
+                if (maximumPendingMessages != 0) {
+                    boolean warnedAboutWait = false;
+                    while (active) {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn("{}: stopped waiting for space in pendingMessage
cursor for: {}", toString(), node.getMessageId());
@@ -150,9 +150,6 @@ public class TopicSubscription extends AbstractSubscription {
                             break;
                         }
                     }
-                }
-                synchronized (matchedListMutex) {
-                    // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
                         // will eagerly evict expired messages
@@ -195,8 +192,8 @@ public class TopicSubscription extends AbstractSubscription {
                             }
                         }
                     }
+                    dispatchMatched();
                 }
-                dispatchMatched();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8dbb48a2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java
index 8b8643a..f05f993 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java
@@ -21,7 +21,7 @@ import javax.jms.JMSException;
 import org.apache.activemq.ActiveMQConnectionFactory;
 
 /**
- * 
+ *  reproduced: https://issues.apache.org/jira/browse/AMQ-4107
  */
 public class TwoMulticastDiscoveryBrokerTopicSendReceiveTest extends TwoBrokerTopicSendReceiveTest
{
 


Mime
View raw message