activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r745031 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/broker/policy/
Date Tue, 17 Feb 2009 12:56:25 GMT
Author: dejanb
Date: Tue Feb 17 12:56:24 2009
New Revision: 745031

URL: http://svn.apache.org/viewvc?rev=745031&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2120

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Tue Feb 17 12:56:24 2009
@@ -17,12 +17,18 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -57,6 +63,7 @@
     private boolean advisoryWhenFull;
     private boolean advisoryForDelivery;
     private boolean advisoryForConsumed;
+    private boolean sendAdvisoryIfNoConsumers;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
     protected final Broker regionBroker;
@@ -323,6 +330,14 @@
     public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
         this.advisdoryForFastProducers = advisdoryForFastProducers;
     }
+    
+    public boolean isSendAdvisoryIfNoConsumers() {
+        return sendAdvisoryIfNoConsumers;
+    }
+
+    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
+        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+    }
 
     /**
      * @return the dead letter strategy
@@ -420,4 +435,54 @@
         this.destinationStatistics.setParent(null);
         this.memoryUsage.stop();
     }
+    
+    /**
+     * Provides a hook to allow messages with no consumer to be processed in
+     * some way - such as to send to a dead letter queue or something..
+     */
+    protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws
Exception { 	
+    	if (!message.isPersistent()) {
+            if (isSendAdvisoryIfNoConsumers()) {
+                // allow messages with no consumers to be dispatched to a dead
+                // letter queue
+                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination))
{
+
+                    // The original destination and transaction id do not get
+                    // filled when the message is first sent,
+                    // it is only populated if the message is routed to another
+                    // destination like the DLQ
+                    if (message.getOriginalDestination() != null) {
+                        message.setOriginalDestination(message.getDestination());
+                    }
+                    if (message.getOriginalTransactionId() != null) {
+                        message.setOriginalTransactionId(message.getTransactionId());
+                    }
+                    
+                    ActiveMQTopic advisoryTopic;
+                    if (destination.isQueue()) {
+                    	advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
+                    } else {
+                    	advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
+                    }
+                    message.setDestination(advisoryTopic);
+                    message.setTransactionId(null);
+
+                    // Disable flow control for this since since we don't want
+                    // to block.
+                    boolean originalFlowControl = context.isProducerFlowControl();
+                    try {
+                        context.setProducerFlowControl(false);
+                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+                        producerExchange.setMutable(false);
+                        producerExchange.setConnectionContext(context);
+                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
+                        context.getBroker().send(producerExchange, message);
+                    } finally {
+                        context.setProducerFlowControl(originalFlowControl);
+                    }
+
+                }
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb 17 12:56:24 2009
@@ -37,6 +37,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -50,6 +51,7 @@
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
@@ -62,6 +64,7 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.DeterministicTaskRunner;
@@ -1210,6 +1213,11 @@
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         messageDelivered(context, msg);
+        synchronized (consumers) {
+            if (consumers.isEmpty()) {
+                onMessageWithNoConsumers(context, msg);
+            }
+        }
         wakeup();
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Feb 17 12:56:24 2009
@@ -70,7 +70,6 @@
     protected final Valve dispatchValve = new Valve(true);   
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
-    private boolean sendAdvisoryIfNoConsumers;
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers
= new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
     private final TaskRunner taskRunner;
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@@ -541,14 +540,6 @@
         this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
     }
 
-    public boolean isSendAdvisoryIfNoConsumers() {
-        return sendAdvisoryIfNoConsumers;
-    }
-
-    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
-        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
-    }
-
     
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -601,48 +592,5 @@
         }
     }
 
-    /**
-     * Provides a hook to allow messages with no consumer to be processed in
-     * some way - such as to send to a dead letter queue or something..
-     */
-    protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws
Exception {
-        if (!message.isPersistent()) {
-            if (sendAdvisoryIfNoConsumers) {
-                // allow messages with no consumers to be dispatched to a dead
-                // letter queue
-                if (!AdvisorySupport.isAdvisoryTopic(destination)) {
-
-                    // The original destination and transaction id do not get
-                    // filled when the message is first sent,
-                    // it is only populated if the message is routed to another
-                    // destination like the DLQ
-                    if (message.getOriginalDestination() != null) {
-                        message.setOriginalDestination(message.getDestination());
-                    }
-                    if (message.getOriginalTransactionId() != null) {
-                        message.setOriginalTransactionId(message.getTransactionId());
-                    }
-
-                    ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
-                    message.setDestination(advisoryTopic);
-                    message.setTransactionId(null);
-
-                    // Disable flow control for this since since we don't want
-                    // to block.
-                    boolean originalFlowControl = context.isProducerFlowControl();
-                    try {
-                        context.setProducerFlowControl(false);
-                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
-                        producerExchange.setMutable(false);
-                        producerExchange.setConnectionContext(context);
-                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
-                        context.getBroker().send(producerExchange, message);
-                    } finally {
-                        context.setProducerFlowControl(originalFlowControl);
-                    }
 
-                }
-            }
-        }
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Tue Feb 17 12:56:24 2009
@@ -110,7 +110,6 @@
         if (subscriptionRecoveryPolicy != null) {
             topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
         }
-        topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         if (memoryLimit > 0) {
             topic.getMemoryUsage().setLimit(memoryLimit);
         }
@@ -132,6 +131,7 @@
         destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
         destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
         destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
+        destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Tue Feb 17 12:56:24 2009
@@ -37,9 +37,6 @@
     public void testDurableTopicMessage() throws Exception {
     }
 
-    public void testTransientQueueMessage() throws Exception {
-    }
-
     protected void doTest() throws Exception {
         makeDlqConsumer();
         sendMessages();
@@ -65,7 +62,11 @@
     }
 
     protected Destination createDlqDestination() {
-        return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+    	if (this.topic) {
+    		return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+    	} else {
+    		return AdvisorySupport.getNoQueueConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+    	}
     }
 
 }



Mime
View raw message