activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r822797 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/BaseDestination.java test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Date Wed, 07 Oct 2009 16:37:29 GMT
Author: dejanb
Date: Wed Oct  7 16:37:28 2009
New Revision: 822797

URL: http://svn.apache.org/viewvc?rev=822797&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2443 - sendAdvisoryIfNoConsumers

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=822797&r1=822796&r2=822797&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Wed Oct  7 16:37:28 2009
@@ -25,6 +25,7 @@
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatchNotification;
@@ -471,13 +472,14 @@
      * Provides a hook to allow messages with no consumer to be processed in
      * some way - such as to send to a dead letter queue or something..
      */
-    protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws
Exception { 	
-    	if (!message.isPersistent()) {
+    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws
Exception {
+    	if (!msg.isPersistent()) {
             if (isSendAdvisoryIfNoConsumers()) {
                 // allow messages with no consumers to be dispatched to a dead
                 // letter queue
                 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination))
{
 
+                    Message message = msg.copy();
                     // The original destination and transaction id do not get
                     // filled when the message is first sent,
                     // it is only populated if the message is routed to another

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java?rev=822797&r1=822796&r2=822797&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Wed Oct  7 16:37:28 2009
@@ -16,14 +16,23 @@
  */
 package org.apache.activemq.broker.policy;
 
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 
 /**
  * @version $Revision$
@@ -46,6 +55,38 @@
             assertNotNull("Should be a message for loop: " + i, msg);
         }
     }
+    
+    public void testConsumerReceivesMessages() throws Exception {
+    	this.topic = false;
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(getDestination());
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+   
+        Topic advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(getDestination());
+        MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
+        
+        TextMessage msg = session.createTextMessage("Message: x");
+        producer.send(msg);
+        
+        Message advisoryMessage = advisoryConsumer.receive(1000);
+        assertNotNull("Advisory message not received", advisoryMessage);
+        
+        Thread.sleep(1000);
+        
+        factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+        
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        MessageConsumer consumer = session.createConsumer(getDestination());
+        Message received = consumer.receive(1000);
+        assertNotNull("Message not received", received);
+    }
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();



Mime
View raw message