activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r360329 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java Topic.java
Date Sat, 31 Dec 2005 16:51:05 GMT
Author: chirino
Date: Sat Dec 31 08:50:52 2005
New Revision: 360329

URL: http://svn.apache.org/viewcvs?rev=360329&view=rev
Log:
imporoved the advisory message being sent by topics when no consumer is listening:
 - It sends it non transactional
 - It does does not block due to flow control

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=360329&r1=360328&r2=360329&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Sat Dec 31 08:50:52 2005
@@ -152,6 +152,8 @@
             
         } else if( ack.isPoisonAck() ) {
             
+            // TODO: what if the message is already in a DLQ???
+            
             // Handle the poison ACK case: we need to send the message to a DLQ  
             if( ack.isInTransaction() )
                 throw new JMSException("Poison ack cannot be transacted: "+ack);
@@ -175,20 +177,16 @@
                         Message message = node.getMessage();
                         if( message !=null ) {
                             
-                            // TODO is this meant to be == null?
+                            // The original destination and transaction id do not get filled
when the message is first sent,
+                            // it is only populated if the message is routed to another destination
like the DLQ
                             if( message.getOriginalDestination()!=null )
                                 message.setOriginalDestination(message.getDestination());
+                            if( message.getOriginalTransactionId()!=null )
+                                message.setOriginalTransactionId(message.getTransactionId());
                             
-                            ActiveMQDestination originalDestination = message.getOriginalDestination();
-                            if (originalDestination == null) {
-                                originalDestination = message.getDestination();
-                            }
                             DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
-                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
+                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
                             message.setDestination(deadLetterDestination);
-                            
-                            if( message.getOriginalTransactionId()!=null )
-                                message.setOriginalTransactionId(message.getTransactionId());
                             message.setTransactionId(null);
                             message.evictMarshlledForm();
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=360329&r1=360328&r2=360329&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Sat Dec 31 08:50:52 2005
@@ -338,11 +338,29 @@
             if (sendAdvisoryIfNoConsumers) {
                 // allow messages with no consumers to be dispatched to a dead
                 // letter queue
-                ActiveMQDestination originalDestination = message.getDestination();
-                if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
-                    ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(originalDestination);
+                if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+                    
+                    // The original destination and transaction id do not get filled when
the message is first sent,
+                    // it is only populated if the message is routed to another destination
like the DLQ
+                    if( message.getOriginalDestination()!=null )
+                        message.setOriginalDestination(message.getDestination());
+                    if( message.getOriginalTransactionId()!=null )
+                        message.setOriginalTransactionId(message.getTransactionId());
+
+                    ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
                     message.setDestination(advisoryTopic);
-                    context.getBroker().send(context, message);
+                    message.setTransactionId(null);
+                    message.evictMarshlledForm();
+
+                    // Disable flow control for this since since we don't want to block.
+                    boolean originalFlowControl = context.isProducerFlowControl();
+                    try {
+                        context.setProducerFlowControl(false);
+                        context.getBroker().send(context, message);
+                    } finally {
+                        context.setProducerFlowControl(originalFlowControl);
+                    }
+                    
                 }
             }
         }



Mime
View raw message