activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r359800 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisorySupport.java broker/region/Topic.java broker/region/policy/PolicyEntry.java
Date Thu, 29 Dec 2005 14:12:51 GMT
Author: jstrachan
Date: Thu Dec 29 06:12:46 2005
New Revision: 359800

URL: http://svn.apache.org/viewcvs?rev=359800&view=rev
Log:
enable by default that non-persistent topic messages which have no consumers are sent to a
dead letter topic

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=359800&r1=359799&r2=359800&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Dec 29 06:12:46 2005
@@ -29,6 +29,10 @@
     public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX+"TempTopic");
     public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Producer.";
     public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Consumer.";
+    public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Topic.";
+    public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
+    public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
+    public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
 
@@ -43,6 +47,26 @@
     
     public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination)
{
         String name = PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getQualifiedName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination)
{
+        String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination)
{
+        String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination)
{
+        String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination)
{
+        String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
         return new ActiveMQTopic(name);
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=359800&r1=359799&r2=359800&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Dec 29 06:12:46 2005
@@ -18,12 +18,14 @@
 
 import java.io.IOException;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
@@ -57,9 +59,10 @@
 
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
+    private boolean sendAdvisoryIfNoConsumers = true;
 
-    public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager,
-            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) {
+    public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager,
DestinationStatistics parentStats,
+            TaskRunnerFactory taskFactory) {
 
         this.destination = destination;
         this.store = store;
@@ -166,33 +169,34 @@
 
     public void send(final ConnectionContext context, final Message message) throws Throwable
{
 
-        if( context.isProducerFlowControl() )
+        if (context.isProducerFlowControl())
             usageManager.waitForSpace();
-        
+
         message.setRegionDestination(this);
-        
+
         if (store != null && message.isPersistent())
             store.addMessage(context, message);
 
         message.incrementReferenceCount();
         try {
-    
+
             if (context.isInTransaction()) {
                 context.getTransaction().addSynchronization(new Synchronization() {
                     public void afterCommit() throws Throwable {
                         dispatch(context, message);
                     }
                 });
-    
+
             }
             else {
                 dispatch(context, message);
             }
-            
-        } finally {
+
+        }
+        finally {
             message.decrementReferenceCount();
         }
-        
+
     }
 
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws
IOException {
@@ -236,7 +240,7 @@
 
     // Properties
     // -------------------------------------------------------------------------
-    
+
     public UsageManager getUsageManager() {
         return usageManager;
     }
@@ -265,12 +269,26 @@
         this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
     }
 
+    public boolean isSendAdvisoryIfNoConsumers() {
+        return sendAdvisoryIfNoConsumers;
+    }
+
+    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
+        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+    }
+
+    public MessageStore getMessageStore() {
+        return store;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
     protected void dispatch(ConnectionContext context, Message message) throws Throwable
{
         destinationStatistics.getEnqueues().increment();
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
         try {
-            if (! subscriptionRecoveryPolicy.add(context, message)) {
+            if (!subscriptionRecoveryPolicy.add(context, message)) {
                 return;
             }
             if (consumers.isEmpty()) {
@@ -280,7 +298,7 @@
 
             msgContext.setDestination(destination);
             msgContext.setMessageReference(message);
-            
+
             if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) {
                 onMessageWithNoConsumers(context, message);
             }
@@ -291,17 +309,23 @@
         }
     }
 
-    /** 
-     * Provides a hook to allow messages with no consumer to be processed in some way - such
as to send to a dead letter queue or something..
+    /**
+     * Provides a hook to allow messages with no consumer to be processed in
+     * some way - such as to send to a dead letter queue or something..
      */
-    protected void onMessageWithNoConsumers(ConnectionContext context, Message message) {
-        if (! message.isPersistent()) {
-            // allow messages with no consumers to be dispatched to a dead letter queue
+    protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws
Throwable {
+        if (!message.isPersistent()) {
+            if (sendAdvisoryIfNoConsumers) {
+                // allow messages with no consumers to be dispatched to a dead
+                // letter queue
+                ActiveMQDestination originalDestination = message.getDestination();
+                if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
+                    ActiveMQTopic advisoryTopic = AdvisorySupport.getExpiredTopicMessageAdvisoryTopic(originalDestination);
+                    message.setDestination(advisoryTopic);
+                    context.getBroker().send(context, message);
+                }
+            }
         }
-    }
-
-    public MessageStore getMessageStore() {
-        return store;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=359800&r1=359799&r2=359800&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Dec 29 06:12:46 2005
@@ -23,7 +23,7 @@
 
 /**
  * Represents an entry in a {@link PolicyMap} for assigning policies to a
- * specific destination or a hierarchial wildcard area of destinations.
+ * specific destination or a hierarchical wildcard area of destinations.
  * 
  * @org.xbean.XBean
  * 
@@ -34,6 +34,7 @@
     private DispatchPolicy dispatchPolicy;
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private RedeliveryPolicy redeliveryPolicy;
+    private boolean sendAdvisoryIfNoConsumers = true;
 
     public void configure(Queue queue) {
         if (dispatchPolicy != null) {
@@ -48,6 +49,7 @@
         if (subscriptionRecoveryPolicy != null) {
             topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
         }
+        topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
     }
 
     // Properties
@@ -76,4 +78,15 @@
         this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
     }
 
+    public boolean isSendAdvisoryIfNoConsumers() {
+        return sendAdvisoryIfNoConsumers;
+    }
+
+    /**
+     * Sends an advisory message if a non-persistent message is sent and there
+     * are no active consumers
+     */
+    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
+        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+    }
 }



Mime
View raw message