activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r640890 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ cursors/
Date Tue, 25 Mar 2008 16:45:10 GMT
Author: chirino
Date: Tue Mar 25 09:45:00 2008
New Revision: 640890

URL: http://svn.apache.org/viewvc?rev=640890&view=rev
Log:
In the queue case, when a consumer was closed it was not properly re-delivering messages to
other available consumers.  This was causing message to look like they got dropped.  

 - When we shut a queue sub down we now get it's pending+dispatched list and re-dispatch that
to the other available subscriptions.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Tue Mar 25 09:45:00 2008
@@ -17,6 +17,9 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.jms.InvalidSelectorException;
@@ -104,8 +107,9 @@
         destinations.add(destination);
     }
 
-    public void remove(ConnectionContext context, Destination destination) throws Exception
{
+    public List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
         destinations.remove(destination);
+        return Collections.EMPTY_LIST;
     }
 
     public ConsumerInfo getConsumerInfo() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Mar 25 09:45:00 2008
@@ -436,11 +436,18 @@
         }
     }
 
-    public void remove(ConnectionContext context, Destination destination) throws Exception
{
+    public List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
+        List<MessageReference> rc = new ArrayList<MessageReference>();
         synchronized(pendingLock) {
             super.remove(context, destination);
-            pending.remove(context, destination);
+            for (MessageReference r : dispatched) {
+                if( r.getRegionDestination() == destination ) {
+                    rc.add((QueueMessageReference)r);
+                }
+            }
+            rc.addAll(pending.remove(context, destination));
         }
+        return rc;
     }
 
     protected void dispatchPending() throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Mar 25 09:45:00 2008
@@ -292,25 +292,20 @@
                 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
                 MessageGroupSet ownedGroups = getMessageGroupOwners()
                         .removeConsumer(consumerId);
+                
                 // redeliver inflight messages
-                sub.remove(context, this);
-
                 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
-                List<QueueMessageReference> inFlight = null;
-                synchronized(pagedInMessages) {
-                    inFlight = new ArrayList<QueueMessageReference>(pagedInMessages.values());
-                }
-                
-                for (QueueMessageReference node:inFlight){
-                    if (!node.isDropped() && !node.isAcked()
-                            && node.getLockOwner() == sub) {
-                        if (node.unlock()) {
-                            node.incrementRedeliveryCounter();
-                            list.add(node);
-                        }
+                for (MessageReference ref : sub.remove(context, this)) {
+                    QueueMessageReference qmr = (QueueMessageReference)ref;
+                    qmr.incrementRedeliveryCounter();
+                    if( qmr.getLockOwner()==sub ) {
+                        qmr.unlock();
+                        qmr.incrementRedeliveryCounter();
                     }
+                    list.add(qmr);
                 }
-                if (list != null && !consumers.isEmpty()) {
+                
+                if (!list.isEmpty() && !consumers.isEmpty()) {
                     doDispatch(list);
                 }
             }
@@ -938,6 +933,7 @@
                 if( rd.subscription instanceof QueueBrowserSubscription ) {
                     ((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
                 }
+                
             } catch (Exception e) {
                 e.printStackTrace();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Mar 25 09:45:00 2008
@@ -87,8 +87,9 @@
      * The subscription will be no longer be receiving messages from the destination.
      * @param context 
      * @param destination
+     * @return a list of un-acked messages that were added to the subscription.
      */
-    void remove(ConnectionContext context, Destination destination) throws Exception;
+    List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception;
     
     /**
      * The ConsumerInfo object that created the subscription.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -16,12 +16,15 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -59,7 +62,9 @@
     public void add(ConnectionContext context, Destination destination) throws Exception
{
     }
 
-    public void remove(ConnectionContext context, Destination destination) throws Exception
{
+    @SuppressWarnings("unchecked")
+    public List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
+        return Collections.EMPTY_LIST;
     }
 
     public boolean isRecoveryRequired() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -110,6 +110,8 @@
         }
         return isDiskListEmpty();
     }
+    
+    
 
     /**
      * reset the cursor

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -18,12 +18,14 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -51,7 +53,7 @@
      * @param destination
      * @throws Exception
      */
-    void remove(ConnectionContext context, Destination destination) throws Exception;
+    List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception;
 
     /**
      * @return true if there are no pending messages

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Mar 25 09:45:00 2008
@@ -17,9 +17,11 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -27,6 +29,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
@@ -128,11 +131,12 @@
      * @param destination
      * @throws Exception
      */
-    public synchronized void remove(ConnectionContext context, Destination destination) throws
Exception {
+    public synchronized List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception {
         Object tsp = topics.remove(destination);
         if (tsp != null) {
             storePrefetches.remove(tsp);
         }
+        return Collections.EMPTY_LIST;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -16,8 +16,13 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.QueueMessageReference;
 
@@ -32,6 +37,18 @@
     private Iterator<MessageReference> iter;
     private MessageReference last;
 
+    
+    @Override
+    public List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
+        List<MessageReference> rc = new ArrayList<MessageReference>();
+        for (MessageReference r : list) {
+            if( r.getRegionDestination()==destination ) {
+                rc.add(r);
+            }
+        }
+        return rc ;        
+    }
+    
     /**
      * @return true if there are no pending messages
      */



Mime
View raw message