activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r908060 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/JmsQueueBrowserTest.java
Date Tue, 09 Feb 2010 14:47:08 GMT
Author: gtully
Date: Tue Feb  9 14:47:07 2010
New Revision: 908060

URL: http://svn.apache.org/viewvc?rev=908060&view=rev
Log:
resolve failure of JDBCStoreBrokerTest on slow machines, identified a sync issue with dispatch
to queue browsers. Tidied up dispatch of queue browser to tie it to normal dispatch and avoid
the timing/sync issue

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=908060&r1=908059&r2=908060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb  9 14:47:07 2010
@@ -296,16 +296,13 @@
     }
 
     /*
-     * Holder for subscription and pagedInMessages as a browser needs access to
-     * existing messages in the queue that have already been dispatched
+     * Holder for subscription that needs attention on next iterate
+     * browser needs access to existing messages in the queue that have already been dispatched
      */
     class BrowserDispatch {
-        ArrayList<QueueMessageReference> messages;
         QueueBrowserSubscription browser;
 
-        public BrowserDispatch(QueueBrowserSubscription browserSubscription, Collection<QueueMessageReference>
values) {
-
-            messages = new ArrayList<QueueMessageReference>(values);
+        public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
             browser = browserSubscription;
             browser.incrementQueueRef();
         }
@@ -362,18 +359,14 @@
             }
 
             if (sub instanceof QueueBrowserSubscription) {
+                // tee up for dispatch in next iterate
                 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription)
sub;
-
-                // do again in iterate to ensure new messages are dispatched
-                pageInMessages(false);
-
                 synchronized (pagedInMessages) {
-                    if (!pagedInMessages.isEmpty()) {
-                        BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription,
pagedInMessages.values());
-                        browserDispatches.addLast(browserDispatch);
-                    }
+                    BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
+                    browserDispatches.addLast(browserDispatch);
                 }
             }
+            
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
@@ -1157,7 +1150,7 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        boolean pageInMoreMessages = false;
+        boolean pageInMoreMessages = false;       
         synchronized (iteratingMutex) {
 
             // do early to allow dispatch of these waiting messages
@@ -1175,31 +1168,6 @@
                 }
             }
 
-            BrowserDispatch rd;
-            while ((rd = getNextBrowserDispatch()) != null) {
-                pageInMoreMessages = true;
-
-                try {
-                    MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
-                    msgContext.setDestination(destination);
-
-                    QueueBrowserSubscription browser = rd.getBrowser();
-                    for (QueueMessageReference node : rd.messages) {
-                        if (!node.isAcked()) {
-                            msgContext.setMessageReference(node);
-                            if (browser.matches(node, msgContext)) {
-                                browser.add(node);
-                            }
-                        }
-                    }
-
-                    rd.done();
-
-                } catch (Exception e) {
-                    LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
-                }
-            }
-
             if (firstConsumer) {
                 firstConsumer = false;
                 try {
@@ -1228,6 +1196,8 @@
                     LOG.error(e);
                 }
             }
+            
+            BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
 
             synchronized (messages) {
                 pageInMoreMessages |= !messages.isEmpty();
@@ -1242,14 +1212,46 @@
             // Perhaps we should page always into the pagedInPendingDispatch list if 
             // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
             // then we do a dispatch.
-            if (pageInMoreMessages) {
+            if (pageInMoreMessages || pendingBrowserDispatch != null) {
                 try {
-                    pageInMessages(false);
+                    pageInMessages(pendingBrowserDispatch != null);
 
                 } catch (Throwable e) {
                     LOG.error("Failed to page in more queue messages ", e);
                 }
             }
+            
+            if (pendingBrowserDispatch != null) {
+                ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
+                synchronized (pagedInMessages) {
+                    alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
+                            + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
+                }
+                do {
+                    try {
+                        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+                        msgContext.setDestination(destination);
+                        
+                        QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
+                        for (QueueMessageReference node : alreadyDispatchedMessages) {
+                            if (!node.isAcked()) {
+                                msgContext.setMessageReference(node);
+                                if (browser.matches(node, msgContext)) {
+                                    browser.add(node);
+                                }
+                            }
+                        }
+                        pendingBrowserDispatch.done();
+                    } catch (Exception e) {
+                        LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(),
e);
+                    }
+                
+                } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
+            }
+            
             if (pendingWakeups.get() > 0) {
                 pendingWakeups.decrementAndGet();
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java?rev=908060&r1=908059&r2=908060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
Tue Feb  9 14:47:07 2010
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq;
 
+import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -26,7 +28,14 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
 
 /**
  * @version $Revision: 1.4 $
@@ -127,5 +136,50 @@
         browser.close();
         producer.close();
 
-    }    
+    }
+    
+    public void testQueueBrowserWith2Consumers() throws Exception {
+        final int numMessages = 1000;
+        connection.setAlwaysSyncSend(false);
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        ActiveMQQueue destinationPrefetch10 = new ActiveMQQueue("TEST?jms.prefetchSize=10");
+        ActiveMQQueue destinationPrefetch1 = new ActiveMQQueue("TEST?jms.prefetchsize=1");
     
+        connection.start();
+
+        ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection(userName,
password);
+        connection2.start();
+        connections.add(connection2);
+        Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destinationPrefetch10);
+  
+        for (int i=0; i<numMessages; i++) {
+            TextMessage message = session.createTextMessage("Message: " + i);
+            producer.send(message);   
+        }
+        
+        QueueBrowser browser = session2.createBrowser(destinationPrefetch1);
+        Enumeration<Message> browserView = browser.getEnumeration();
+    
+        List<Message> messages = new ArrayList<Message>();
+        for (int i = 0; i < numMessages; i++) {
+            Message m1 = consumer.receive(5000);
+            assertNotNull("m1 is null for index: " + i, m1);
+            messages.add(m1);
+        }
+
+        int i = 0;
+        for (; i < numMessages && browserView.hasMoreElements(); i++) {
+            Message m1 = messages.get(i);
+            Message m2 = browserView.nextElement();
+            assertNotNull("m2 is null for index: " + i, m2);
+            assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+        }
+        assertEquals("got all: ", numMessages, i);
+
+        assertFalse("nothing left in the browser", browserView.hasMoreElements());
+        assertNull("consumer finished", consumer.receiveNoWait());
+    }
 }



Mime
View raw message