activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r638924 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: AbstractRegion.java Queue.java QueueBrowserSubscription.java TempQueue.java
Date Wed, 19 Mar 2008 16:29:30 GMT
Author: chirino
Date: Wed Mar 19 09:29:26 2008
New Revision: 638924

URL: http://svn.apache.org/viewvc?rev=638924&view=rev
Log:
Do the inital recovery dispatch in the iterate() thread so that the addSubscription() operation
on the Queue executes quickly.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Wed Mar 19 09:29:26 2008
@@ -265,7 +265,7 @@
             }
 
             if (info.isBrowser()) {
-                ((QueueBrowserSubscription)sub).browseDone();
+                ((QueueBrowserSubscription)sub).destinationsAdded();
             }
 
             return sub;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar 19 09:29:26 2008
@@ -99,6 +99,7 @@
             wakeup();
         }
     };
+    
     private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>()
{
 
         public int compare(Subscription s1, Subscription s2) {
@@ -195,14 +196,19 @@
         }
     }
 
+    class RecoveryDispatch {
+        public ArrayList<QueueMessageReference> messages;
+        public Subscription subscription;
+    }
    
+    LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
         dispatchLock.lock();
         try {
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
-            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+//            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
 
             // needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
@@ -223,21 +229,33 @@
             // duplicates
             // etc.
             doPageIn(false);
-            msgContext.setDestination(destination);
+//            msgContext.setDestination(destination);
+
             synchronized (pagedInMessages) {
-                // Add all the matching messages in the queue to the
-                // subscription.
-                
-                for (QueueMessageReference node:pagedInMessages.values()){
-                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped()
||sub.getConsumerInfo().isBrowser())) {
-                        msgContext.setMessageReference(node);
-                        if (sub.matches(node, msgContext)) {
-                            sub.add(node);
-                        }
-                    }
-                }
-                
-            }
+                RecoveryDispatch rd = new RecoveryDispatch();
+                rd.messages =  new ArrayList<QueueMessageReference>(pagedInMessages.values());
+                rd.subscription = sub;
+                recoveries.addLast(rd);
+            }
+            
+            if( sub instanceof QueueBrowserSubscription ) {
+                ((QueueBrowserSubscription)sub).incrementQueueRef();
+            }
+            
+//                System.out.println(new Date()+": Locked pagedInMessages: "+sub.getConsumerInfo().getConsumerId());
+//                // Add all the matching messages in the queue to the
+//                // subscription.
+//                
+//                for (QueueMessageReference node:pagedInMessages.values()){
+//                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped()
||sub.getConsumerInfo().isBrowser())) {
+//                        msgContext.setMessageReference(node);
+//                        if (sub.matches(node, msgContext)) {
+//                            sub.add(node);
+//                        }
+//                    }
+//                }
+//                
+//            }
             wakeup();
         }finally {
             dispatchLock.unlock();
@@ -880,16 +898,56 @@
         } while (count < this.destinationStatistics.getMessages().getCount());
         return movedCounter;
     }
+    
+    RecoveryDispatch getNextRecoveryDispatch() {
+        synchronized (pagedInMessages) {
+            if( recoveries.isEmpty() ) {
+                return null;
+            }
+            return recoveries.removeFirst();
+        }
+
+    }
+    protected boolean isRecoveryDispatchEmpty() {
+        synchronized (pagedInMessages) {
+            return recoveries.isEmpty();
+        }
+    }
 
     /**
      * @return true if we would like to iterate again
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
+        
+        RecoveryDispatch rd;
+        while ((rd = getNextRecoveryDispatch()) != null) {
+            try {
+                MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+                msgContext.setDestination(destination);
+    
+                for (QueueMessageReference node : rd.messages) {
+                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped()
|| rd.subscription.getConsumerInfo().isBrowser())) {
+                        msgContext.setMessageReference(node);
+                            if (rd.subscription.matches(node, msgContext)) {
+                                rd.subscription.add(node);
+                            }
+                    }
+                }
+                
+                if( rd.subscription instanceof QueueBrowserSubscription ) {
+                    ((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
         boolean result = false;
         synchronized (messages) {
             result = !messages.isEmpty();
-        }
+        }               
+        
         if (result) {
             try {
                pageInMessages(false);
@@ -1027,46 +1085,53 @@
     
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
         if (list != null) {
-            synchronized (consumers) {
-                for (MessageReference node : list) {
-                    Subscription target = null;
-                    List<Subscription> targets = null;
-                    for (Subscription s : consumers) {
-                        if (dispatchSelector.canSelect(s, node)) {
-                            if (!s.isFull()) {
-                                s.add(node);
-                                node.incrementReferenceCount();
-                                target = s;
-                                break;
-                            } else {
-                                if (targets == null) {
-                                    targets = new ArrayList<Subscription>();
-                                }
-                                targets.add(s);
+            List<Subscription> consumers;
+            synchronized (this.consumers) {
+                consumers = new ArrayList<Subscription>(this.consumers);
+            }
+            
+            for (MessageReference node : list) {
+                Subscription target = null;
+                List<Subscription> targets = null;
+                for (Subscription s : consumers) {
+                    if (dispatchSelector.canSelect(s, node)) {
+                        if (!s.isFull()) {
+                            s.add(node);
+                            node.incrementReferenceCount();
+                            target = s;
+                            break;
+                        } else {
+                            if (targets == null) {
+                                targets = new ArrayList<Subscription>();
                             }
+                            targets.add(s);
                         }
                     }
-                    if (target == null && targets != null) {
-                        // pick the least loaded to add the message too
-                        for (Subscription s : targets) {
-                            if (target == null
-                                    || target.getInFlightUsage() > s
-                                            .getInFlightUsage()) {
-                                target = s;
-                            }
-                        }
-                        if (target != null) {
-                            target.add(node);
-                            node.incrementReferenceCount();
+                }
+                if (target == null && targets != null) {
+                    // pick the least loaded to add the message too
+                    for (Subscription s : targets) {
+                        if (target == null
+                                || target.getInFlightUsage() > s
+                                        .getInFlightUsage()) {
+                            target = s;
                         }
                     }
-                    if (target != null && !strictOrderDispatch && consumers.size()
> 1 &&
-                             !dispatchSelector.isExclusiveConsumer(target)) {
-                        removeFromConsumerList(target);
-                        addToConsumerList(target);
+                    if (target != null) {
+                        target.add(node);
+                        node.incrementReferenceCount();
                     }
-
                 }
+                if (target != null && !strictOrderDispatch && consumers.size()
> 1 &&
+                         !dispatchSelector.isExclusiveConsumer(target)) {
+                    synchronized (this.consumers) {
+                        if( removeFromConsumerList(target) ) {
+                            addToConsumerList(target);
+                            consumers = new ArrayList<Subscription>(this.consumers);
+                        }
+                    }
+                }
+
             }
         }
     }
@@ -1094,8 +1159,8 @@
         }
     }
     
-    private void removeFromConsumerList(Subscription sub) {
-        consumers.remove(sub);
+    private boolean removeFromConsumerList(Subscription sub) {
+        return consumers.remove(sub);
     }
     
     private int getConsumerMessageCountBeforeFull() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
Wed Mar 19 09:29:26 2008
@@ -29,7 +29,9 @@
 
 public class QueueBrowserSubscription extends QueueSubscription {
 
+    int queueRefs;
     boolean browseDone;
+    boolean destinationsAdded;
 
     public QueueBrowserSubscription(Broker broker,Destination destination, SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info)
         throws InvalidSelectorException {
@@ -46,9 +48,16 @@
                + this.prefetchExtension + ", pending=" + getPendingQueueSize();
     }
 
-    public void browseDone() throws Exception {
-        browseDone = true;
-        add(QueueMessageReference.NULL_MESSAGE);
+    synchronized public void destinationsAdded() throws Exception {
+        destinationsAdded = true;
+        checkDone();
+    }
+
+    private void checkDone() throws Exception {
+        if( !browseDone && queueRefs == 0 && destinationsAdded) {
+            browseDone=true;
+            add(QueueMessageReference.NULL_MESSAGE);
+        }
     }
 
     public boolean matches(MessageReference node, MessageEvaluationContext context) throws
IOException {
@@ -60,6 +69,15 @@
      */
     protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference
n)
         throws IOException {
+    }
+
+    synchronized public void incrementQueueRef() {
+        queueRefs++;        
+    }
+
+    synchronized public void decrementQueueRef() throws Exception {
+        queueRefs--;
+        checkDone();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Wed Mar 19 09:29:26 2008
@@ -87,7 +87,7 @@
                 log.error("Failed to page in more queue messages ", e);
             }
         }
-        if (!messagesWaitingForSpace.isEmpty()) {
+        if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) {
             try {
                 taskRunner.wakeup();
             } catch (InterruptedException e) {



Mime
View raw message