activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r643390 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Destination.java DestinationFilter.java PrefetchSubscription.java Queue.java
Date Tue, 01 Apr 2008 13:23:52 GMT
Author: chirino
Date: Tue Apr  1 06:22:48 2008
New Revision: 643390

URL: http://svn.apache.org/viewvc?rev=643390&view=rev
Log:
When pulling a message, iterate the destinations first to make sure that it has pushed all
available messages to 
the sub.  This should fix the ZeroPrefetchTest that was intermitently failing on slower machines.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Tue Apr  1 06:22:48 2008
@@ -27,12 +27,13 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.Task;
 import org.apache.activemq.usage.MemoryUsage;
 
 /**
  * @version $Revision: 1.12 $
  */
-public interface Destination extends Service {
+public interface Destination extends Service, Task {
 
     void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Tue Apr  1 06:22:48 2008
@@ -206,4 +206,8 @@
     public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference node) {
         next.messageExpired(context, prefetchSubscription, node);        
     }
+
+	public boolean iterate() {
+		return next.iterate();
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Apr  1 06:22:48 2008
@@ -79,32 +79,43 @@
     /**
      * Allows a message to be pulled on demand by a client
      */
-    public synchronized Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
+    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
{
         // The slave should not deliver pull messages. TODO: when the slave
         // becomes a master,
         // He should send a NULL message to all the consumers to 'wake them up'
         // in case
         // they were waiting for a message.
         if (getPrefetchSize() == 0 && !isSlave()) {
-            prefetchExtension++;
-            final long dispatchCounterBeforePull = dispatchCounter;
-            dispatchPending();
-            // If there was nothing dispatched.. we may need to setup a timeout.
-            if (dispatchCounterBeforePull == dispatchCounter) {
-                // imediate timeout used by receiveNoWait()
-                if (pull.getTimeout() == -1) {
-                    // Send a NULL message.
-                    add(QueueMessageReference.NULL_MESSAGE);
-                    dispatchPending();
-                }
-                if (pull.getTimeout() > 0) {
-                    Scheduler.executeAfterDelay(new Runnable() {
-
-                        public void run() {
-                            pullTimeout(dispatchCounterBeforePull);
-                        }
-                    }, pull.getTimeout());
-                }
+            final long dispatchCounterBeforePull;
+        	synchronized(this) {
+        		prefetchExtension++;
+        		dispatchCounterBeforePull = dispatchCounter;
+        	}
+            
+        	// Have the destination push us some messages.
+        	for (Destination dest : destinations) {
+				dest.iterate();
+			}
+        	dispatchPending();
+            
+            synchronized(this) {
+	            // If there was nothing dispatched.. we may need to setup a timeout.
+	            if (dispatchCounterBeforePull == dispatchCounter) {
+	                // imediate timeout used by receiveNoWait()
+	                if (pull.getTimeout() == -1) {
+	                    // Send a NULL message.
+	                    add(QueueMessageReference.NULL_MESSAGE);
+	                    dispatchPending();
+	                }
+	                if (pull.getTimeout() > 0) {
+	                    Scheduler.executeAfterDelay(new Runnable() {
+	
+	                        public void run() {
+	                            pullTimeout(dispatchCounterBeforePull);
+	                        }
+	                    }, pull.getTimeout());
+	                }
+	            }
             }
         }
         return null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Apr  1 06:22:48 2008
@@ -99,6 +99,7 @@
             wakeup();
         }
     };
+    private final Object iteratingMutex = new Object() {};
     
     private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>()
{
 
@@ -914,51 +915,52 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        
-        RecoveryDispatch rd;
-        while ((rd = getNextRecoveryDispatch()) != null) {
-            try {
-                MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
-                msgContext.setDestination(destination);
-    
-                for (QueueMessageReference node : rd.messages) {
-                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped()
|| rd.subscription.getConsumerInfo().isBrowser())) {
-                        msgContext.setMessageReference(node);
-                            if (rd.subscription.matches(node, msgContext)) {
-                                rd.subscription.add(node);
-                            }
-                    }
-                }
-                
-                if( rd.subscription instanceof QueueBrowserSubscription ) {
-                    ((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
-                }
-                
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        boolean result = false;
-        synchronized (messages) {
-            result = !messages.isEmpty();
-        }               
-        
-        if (result) {
-            try {
-               pageInMessages(false);
-               
-            } catch (Throwable e) {
-                log.error("Failed to page in more queue messages ", e);
-            }
-        }
-        synchronized(messagesWaitingForSpace) {
-               while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull())
{
-                   Runnable op = messagesWaitingForSpace.removeFirst();
-                   op.run();
-               }
+        synchronized(iteratingMutex) {
+	        RecoveryDispatch rd;
+	        while ((rd = getNextRecoveryDispatch()) != null) {
+	            try {
+	                MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+	                msgContext.setDestination(destination);
+	    
+	                for (QueueMessageReference node : rd.messages) {
+	                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped()
|| rd.subscription.getConsumerInfo().isBrowser())) {
+	                        msgContext.setMessageReference(node);
+	                            if (rd.subscription.matches(node, msgContext)) {
+	                                rd.subscription.add(node);
+	                            }
+	                    }
+	                }
+	                
+	                if( rd.subscription instanceof QueueBrowserSubscription ) {
+	                    ((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
+	                }
+	                
+	            } catch (Exception e) {
+	                e.printStackTrace();
+	            }
+	        }
+	
+	        boolean result = false;
+	        synchronized (messages) {
+	            result = !messages.isEmpty();
+	        }               
+	        
+	        if (result) {
+	            try {
+	               pageInMessages(false);
+	               
+	            } catch (Throwable e) {
+	                log.error("Failed to page in more queue messages ", e);
+	            }
+	        }
+	        synchronized(messagesWaitingForSpace) {
+	               while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull())
{
+	                   Runnable op = messagesWaitingForSpace.removeFirst();
+	                   op.run();
+	               }
+	        }
+	        return false;
         }
-        return false;
     }
 
     protected MessageReferenceFilter createMessageIdFilter(final String messageId) {



Mime
View raw message