activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r359547 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region: Topic.java policy/DispatchPolicy.java policy/RoundRobinDispatchPolicy.java policy/SimpleDispatchPolicy.java policy/StrictOrderDispatchPolicy.java
Date Wed, 28 Dec 2005 15:33:10 GMT
Author: jstrachan
Date: Wed Dec 28 07:33:05 2005
New Revision: 359547

URL: http://svn.apache.org/viewcvs?rev=359547&view=rev
Log:
minor refactor to provide a hook when dispatching messages which have no consumers; so that
we can for example, send them to a dead letter queue

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
Wed Dec 28 07:33:05 2005
@@ -273,17 +273,30 @@
             if (! subscriptionRecoveryPolicy.add(context, message)) {
                 return;
             }
-            if (consumers.isEmpty())
+            if (consumers.isEmpty()) {
+                onMessageWithNoConsumers(context, message);
                 return;
+            }
 
             msgContext.setDestination(destination);
             msgContext.setMessageReference(message);
             
-            dispatchPolicy.dispatch(context, message, msgContext, consumers);
+            if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) {
+                onMessageWithNoConsumers(context, message);
+            }
         }
         finally {
             msgContext.clear();
             dispatchValve.decrement();
+        }
+    }
+
+    /** 
+     * Provides a hook to allow messages with no consumer to be processed in some way - such
as to send to a dead letter queue or something..
+     */
+    protected void onMessageWithNoConsumers(ConnectionContext context, Message message) {
+        if (! message.isPersistent()) {
+            // allow messages with no consumers to be dispatched to a dead letter queue
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -41,7 +41,9 @@
      * large pre-fetch may take all the messages if he is always dispatched to first.  
      * Once a message has been locked, it does not need to be dispatched to any 
      * further subscriptions.
+     * 
+     * @return true if at least one consumer was dispatched or false if there are no active
subscriptions that could be dispatched
      */
-    void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable;
+    boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -37,12 +37,13 @@
 
     private final Object mutex = new Object();
     
-    public void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable {
+    public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable {
         
         // Big synch here so that only 1 message gets dispatched at a time.  Ensures 
         // Everyone sees the same order and that the consumer list is not used while
         // it's being rotated.
         synchronized(mutex) {
+            int count = 0;
             
             for (Iterator iter = consumers.iterator(); iter.hasNext();) {
                 Subscription sub = (Subscription) iter.next();
@@ -52,6 +53,7 @@
                     continue;
                 
                 sub.add(node);
+                count++;
             }
             
             // Rotate the consumer list.
@@ -59,6 +61,7 @@
                 consumers.add(consumers.remove(0));
             } catch (Throwable bestEffort) {
             }
+            return count > 0;
         }        
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -35,8 +35,8 @@
  */
 public class SimpleDispatchPolicy implements DispatchPolicy {
 
-    public void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable {
-        
+    public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable {
+        int count = 0;
         for (Iterator iter = consumers.iterator(); iter.hasNext();) {
             Subscription sub = (Subscription) iter.next();
             
@@ -48,7 +48,9 @@
                 continue;
             
             sub.add(node);
+            count++;
         }
+        return count > 0;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -36,11 +36,11 @@
     int i=0;
     private final Object mutex = new Object();
     
-    public void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable {
-        
+    public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext
msgContext, CopyOnWriteArrayList consumers) throws Throwable {
         // Big synch here so that only 1 message gets dispatched at a time.  Ensures 
         // Everyone sees the same order.
         synchronized(mutex) {
+            int count = 0;
             i++;
             for (Iterator iter = consumers.iterator(); iter.hasNext();) {
                 Subscription sub = (Subscription) iter.next();
@@ -50,7 +50,9 @@
                     continue;
                 
                 sub.add(node);
+                count++;
             }
+            return count > 0;
         }
     }
 



Mime
View raw message