activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1077886 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/util/ test/java/org/apache/active...
Date Fri, 04 Mar 2011 11:31:01 GMT
Author: gtully
Date: Fri Mar  4 11:31:01 2011
New Revision: 1077886

URL: http://svn.apache.org/viewvc?rev=1077886&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3193 - fix regression with spring listener test.
IndirectMessageRef cannot do equals bc it will break composite destination delivery to a single
consumer, possible to break inflight count as ack of one message removes others from dispatched
when message ref matches in error.
Fix is to ensure message ref is passed about such that there is a single ref per message.
rework some internals to pass ref rather than message.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=1077886&r1=1077885&r2=1077886&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Fri Mar  4 11:31:01 2011
@@ -22,6 +22,7 @@ import javax.jms.JMSException;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.util.BrokerSupport;
@@ -35,7 +36,8 @@ public class QueueView extends Destinati
     }
 
     public CompositeData getMessage(String messageId) throws OpenDataException {
-        Message rc = ((Queue)destination).getMessage(messageId);
+        QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
+        Message rc = ref.getMessage();
         if (rc == null) {
             return null;
         }
@@ -99,14 +101,13 @@ public class QueueView extends Destinati
      */
     public boolean retryMessage(String messageId) throws Exception {
         Queue queue = (Queue) destination;
-        Message rc = queue.getMessage(messageId);
+        QueueMessageReference ref = queue.getMessage(messageId);
+        Message rc = ref.getMessage();
         if (rc != null) {
-            rc = rc.copy();
-            rc.getMessage().setRedeliveryCounter(0);
             ActiveMQDestination originalDestination = rc.getOriginalDestination();
             if (originalDestination != null) {
                 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
-                return queue.moveMessageTo(context, rc, originalDestination);
+                return queue.moveMessageTo(context, ref, originalDestination);
             }
             else {
                 throw new JMSException("No original destination for message: "+ messageId);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=1077886&r1=1077885&r2=1077886&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Fri Mar  4 11:31:01 2011
@@ -72,17 +72,6 @@ public class IndirectMessageReference im
         return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" +
acked + " locked=" + (lockOwner != null);
     }
 
-    @Override
-    public boolean equals(Object obj) {
-        return this == obj || (obj instanceof IndirectMessageReference &&
-                message.getMessageId().equals(((IndirectMessageReference)obj).getMessage().getMessageId()));
-    }
-
-    @Override
-    public int hashCode() {
-        return message.hashCode();
-    }
-
     public void incrementRedeliveryCounter() {
         message.incrementRedeliveryCounter();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1077886&r1=1077885&r2=1077886&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar  4 11:31:01 2011
@@ -353,6 +353,13 @@ public class Queue extends BaseDestinati
     LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub +
", dequeues: "
+                    + getDestinationStatistics().getDequeues().getCount() + ", dispatched:
"
+                    + getDestinationStatistics().getDispatched().getCount() + ", inflight:
"
+                    + getDestinationStatistics().getInflight().getCount());
+        }
+
         super.addSubscription(context, sub);
         // synchronize with dispatch method so that no new messages are sent
         // while setting up a subscription. avoid out of order messages,
@@ -427,7 +434,7 @@ public class Queue extends BaseDestinati
         pagedInPendingDispatchLock.writeLock().lock();
         try {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
+ ", dequeues: "
+                LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " +
sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
                         + getDestinationStatistics().getDequeues().getCount() + ", dispatched:
"
                         + getDestinationStatistics().getDispatched().getCount() + ", inflight:
"
                         + getDestinationStatistics().getInflight().getCount());
@@ -1058,13 +1065,13 @@ public class Queue extends BaseDestinati
         }
     }
 
-    public Message getMessage(String id) {
+    public QueueMessageReference getMessage(String id) {
         MessageId msgId = new MessageId(id);
         pagedInMessagesLock.readLock().lock();
         try{
-            QueueMessageReference r = this.pagedInMessages.get(msgId);
-            if (r != null) {
-                return r.getMessage();
+            QueueMessageReference ref = this.pagedInMessages.get(msgId);
+            if (ref != null) {
+                return ref;
             }
         }finally {
             pagedInMessagesLock.readLock().unlock();
@@ -1074,15 +1081,12 @@ public class Queue extends BaseDestinati
             try {
                 messages.reset();
                 while (messages.hasNext()) {
-                    MessageReference r = messages.next();
-                    r.decrementReferenceCount();
-                    messages.rollback(r.getMessageId());
-                    if (msgId.equals(r.getMessageId())) {
-                        Message m = r.getMessage();
-                        if (m != null) {
-                            return m;
-                        }
-                        break;
+                    MessageReference mr = messages.next();
+                    QueueMessageReference qmr = createMessageReference(mr.getMessage());
+                    qmr.decrementReferenceCount();
+                    messages.rollback(qmr.getMessageId());
+                    if (msgId.equals(qmr.getMessageId())) {
+                        return qmr;
                     }
                 }
             } finally {
@@ -1261,22 +1265,21 @@ public class Queue extends BaseDestinati
 
     /**
      * Move a message
-     * 
+     *
      * @param context
      *            connection context
      * @param m
-     *            message
+     *            QueueMessageReference
      * @param dest
      *            ActiveMQDestination
      * @throws Exception
      */
-    public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination
dest) throws Exception {
-        QueueMessageReference r = createMessageReference(m);
-        BrokerSupport.resend(context, m, dest);
-        removeMessage(context, r);
+    public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination
dest) throws Exception {
+        BrokerSupport.resend(context, m.getMessage(), dest);
+        removeMessage(context, m);
         messagesLock.writeLock().lock();
         try{
-            messages.rollback(r.getMessageId());
+            messages.rollback(m.getMessageId());
         }finally {
             messagesLock.writeLock().unlock();
         }
@@ -1317,7 +1320,7 @@ public class Queue extends BaseDestinati
     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
             ActiveMQDestination dest, int maximumMessages) throws Exception {
         int movedCounter = 0;
-        Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
+        Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>();
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
@@ -1326,13 +1329,12 @@ public class Queue extends BaseDestinati
             }finally {
                 pagedInMessagesLock.readLock().unlock();
             }
-            List<MessageReference> list = new ArrayList<MessageReference>(set);
-            for (MessageReference ref : list) {
-                IndirectMessageReference r = (IndirectMessageReference) ref;
-                if (filter.evaluate(context, r)) {
+            List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
+            for (QueueMessageReference ref : list) {
+                if (filter.evaluate(context, ref)) {
                     // We should only move messages that can be locked.
-                    moveMessageTo(context, ref.getMessage(), dest);
-                    set.remove(r);
+                    moveMessageTo(context, ref, dest);
+                    set.remove(ref);
                     if (++movedCounter >= maximumMessages && maximumMessages >
0) {
                         return movedCounter;
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=1077886&r1=1077885&r2=1077886&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Fri Mar  4 11:31:01 2011
@@ -106,7 +106,7 @@ public class InactivityMonitor extends T
         public void run() {
             long now = System.currentTimeMillis();
             if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
-                LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
+                LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
 
             }
             lastRunTime = now;
@@ -142,7 +142,7 @@ public class InactivityMonitor extends T
 
         if (!commandSent.get() && useKeepAlive) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+                LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
             }
             ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
@@ -160,7 +160,7 @@ public class InactivityMonitor extends T
             });
         } else {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Message sent since last write check, resetting flag");
+                LOG.trace(this + " message sent since last write check, resetting flag");
             }
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=1077886&r1=1077885&r2=1077886&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
Fri Mar  4 11:31:01 2011
@@ -55,6 +55,7 @@ public final class BrokerSupport {
         message.setDestination(deadLetterDestination);
         message.setTransactionId(null);
         message.setMemoryUsage(null);
+        message.setRedeliveryCounter(0);
         boolean originalFlowControl = context.isProducerFlowControl();
         try {
             context.setProducerFlowControl(false);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1077886&r1=1077885&r2=1077886&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Fri Mar  4 11:31:01 2011
@@ -279,8 +279,6 @@ public class MBeanTest extends EmbeddedB
         assertEquals("browse queue size", initialQueueSize, actualCount);
         
         assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
-        assertEquals("dlq still has memory usage", dlqMemUsage, dlq.getMemoryPercentUsage());
-        
     }
 
     public void testMoveMessagesBySelector() throws Exception {



Mime
View raw message