activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r736720 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java QueueDispatchSelector.java
Date Thu, 22 Jan 2009 17:22:15 GMT
Author: dejanb
Date: Thu Jan 22 09:22:12 2009
New Revision: 736720

URL: http://svn.apache.org/viewvc?rev=736720&view=rev
Log:
Recovery dispatch refactoring as the part of the solution for the https://issues.apache.org/activemq/browse/AMQ-2016

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=736720&r1=736719&r2=736720&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Jan 22 09:22:12 2009
@@ -233,16 +233,18 @@
                 }
             }
             
-            // any newly paged in messages that are not dispatched are added to pagedInPending
in iterate()
-            doPageIn(false);
+            // do recovery dispatch only if it is a browser subscription
+            if(sub instanceof QueueBrowserSubscription ) { 
+            	// any newly paged in messages that are not dispatched are added to pagedInPending
in iterate()
+            	doPageIn(false);
+            
+            	synchronized (pagedInMessages) {
+            		RecoveryDispatch rd = new RecoveryDispatch();
+            		rd.messages =  new ArrayList<QueueMessageReference>(pagedInMessages.values());
+            		rd.subscription = sub;
+            		recoveries.addLast(rd);
+            	}
             
-            synchronized (pagedInMessages) {
-                RecoveryDispatch rd = new RecoveryDispatch();
-                rd.messages =  new ArrayList<QueueMessageReference>(pagedInMessages.values());
-                rd.subscription = sub;
-                recoveries.addLast(rd);
-            }
-            if( sub instanceof QueueBrowserSubscription ) {
                 ((QueueBrowserSubscription)sub).incrementQueueRef();
             }
             if (!(this.optimizedDispatch || isSlave())) {
@@ -303,9 +305,14 @@
                     doDispatch(list);
                 }
             }
-
-            if (consumers.isEmpty()) {
-                messages.gc();
+            //if it is a last consumer (and not a browser) dispatch all pagedIn messages
+            if (consumers.isEmpty() && !(sub instanceof QueueBrowserSubscription))
{
+            		List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
+            		for (QueueMessageReference ref : pagedInMessages.values()) {
+            			list.add(ref);
+            		}
+            		pagedInPendingDispatch.clear();
+            		doDispatch(list);
             }
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
@@ -615,6 +622,7 @@
         int count = 0;
         List<Message> l = new ArrayList<Message>();
         try {
+            pageInMessages(false);
             synchronized (this.pagedInPendingDispatch) {
                 for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
                         .iterator(); i.hasNext()
@@ -657,7 +665,7 @@
                     }
                 }
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Problem retrieving message in browse() ", e);
         }
         return l.toArray(new Message[l.size()]);
@@ -899,7 +907,7 @@
         int movedCounter = 0;
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
-            pageInMessages();
+            doPageIn(true);
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
             }
@@ -981,7 +989,7 @@
 	                e.printStackTrace();
 	            }
 	        }
-	
+	        
 	        boolean pageInMoreMessages = false;
 	        synchronized (messages) {
 	            pageInMoreMessages = !messages.isEmpty();
@@ -1230,6 +1238,7 @@
                                 pagedInPendingDispatch.add(qmr);
                             }
                         }
+                        wakeup();
                     }
                 }
             }
@@ -1268,11 +1277,16 @@
                         }
                     }
                     interestCount++;
+                } else {
+                	// makes sure it gets dispatched again
+                	if (!node.isDropped() && !((QueueMessageReference)node).isAcked()
&& (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
+                		interestCount++;
+                	}
                 }
             }
             
-            if (target == null && interestCount>0) {
-                // This means all subs were full...
+            if ((target == null && interestCount>0) || consumers.size() == 0)
{
+                // This means all subs were full or that there are no consumers...
                 rc.add((QueueMessageReference)node);
             }
 
@@ -1288,10 +1302,6 @@
             }
         }
 
-        //LOG.info(getName()+" Pending messages:");
-        //for (MessageReference n : rc) {
-       //     LOG.info(getName()+"  - " + n.getMessageId());
-       // }
         
         return rc;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=736720&r1=736719&r2=736720&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
Thu Jan 22 09:22:12 2009
@@ -70,7 +70,7 @@
         if (result) {
             result = exclusiveConsumer == null
                     || exclusiveConsumer == subscription;
-            if (result) {
+            if (result && !subscription.isFull()) {
                 QueueMessageReference node = (QueueMessageReference) m;
                 // Keep message groups together.
                 String groupId = node.getGroupID();



Mime
View raw message