activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1084023 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/a...
Date Mon, 21 Mar 2011 23:41:45 GMT
Author: gtully
Date: Mon Mar 21 23:41:44 2011
New Revision: 1084023

URL: http://svn.apache.org/viewvc?rev=1084023&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3003 - Allow the option of a DLQ per durable subscription
DeadLetterStrategy. Additional attribute destinationPerDurableSubscriber, when true causes
default dlq individual prefex to be prepended with the durab sub key, connectionid:subscriberName
pair. Modified DeadLetterStrategy interface to pass in message and subscription, to provide
fine level controll of dlq name. Also modified Broker interface to provide subscription to
message expiry and sendToDLQ methods.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Mon Mar 21 23:41:44 2011
@@ -251,8 +251,8 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
-    public void messageExpired(ConnectionContext context, MessageReference messageReference)
{
-        super.messageExpired(context, messageReference);
+    public void messageExpired(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
+        super.messageExpired(context, messageReference, subscription);
         try {
             if(!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
@@ -376,8 +376,9 @@ public class AdvisoryBroker extends Brok
     }
     
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
-        super.sendToDeadLetterQueue(context, messageReference);
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                      Subscription subscription){
+        super.sendToDeadLetterQueue(context, messageReference, subscription);
         try {
             if(!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Mon
Mar 21 23:41:44 2011
@@ -305,19 +305,21 @@ public interface Broker extends Region, 
 
     /**
      * A Message has Expired
-     * 
+     *
      * @param context
      * @param messageReference
+     * @param subscription, may be null
      */
-    void messageExpired(ConnectionContext context, MessageReference messageReference);
+    void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription
subscription);
 
     /**
      * A message needs to go the a DLQ
      * 
      * @param context
      * @param messageReference
+     * @param subscription, may be null
      */
-    void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
+    void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription);
     
     /**
      * @return the broker sequence id

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Mon Mar 21 23:41:44 2011
@@ -40,7 +40,6 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
@@ -254,12 +253,13 @@ public class BrokerFilter implements Bro
         return next.isExpired(messageReference);
     }
 
-    public void messageExpired(ConnectionContext context, MessageReference message) {
-        next.messageExpired(context, message);
+    public void messageExpired(ConnectionContext context, MessageReference message, Subscription
subscription) {
+        next.messageExpired(context, message, subscription);
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference)
{
-        next.sendToDeadLetterQueue(context, messageReference);
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                      Subscription subscription) {
+        next.sendToDeadLetterQueue(context, messageReference, subscription);
     }
 
     public Broker getRoot() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Mon Mar 21 23:41:44 2011
@@ -247,10 +247,12 @@ public class EmptyBroker implements Brok
         return false;
     }
 
-    public void messageExpired(ConnectionContext context, MessageReference message) {
+    public void messageExpired(ConnectionContext context, MessageReference message, Subscription
subscription) {
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference)
{
+    public void sendToDeadLetterQueue(ConnectionContext context,
+                                      MessageReference messageReference,
+                                      Subscription subscription) {
     }
 
     public Broker getRoot() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Mon Mar 21 23:41:44 2011
@@ -256,11 +256,12 @@ public class ErrorBroker implements Brok
         throw new BrokerStoppedException(this.message);
     }
 
-    public void messageExpired(ConnectionContext context, MessageReference message) {
+    public void messageExpired(ConnectionContext context, MessageReference message, Subscription
subscription) {
         throw new BrokerStoppedException(this.message);
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference)
{
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                      Subscription subscription) {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Mon Mar 21 23:41:44 2011
@@ -265,12 +265,13 @@ public class MutableBrokerFilter impleme
         return getNext().isExpired(messageReference);
     }
 
-    public void messageExpired(ConnectionContext context, MessageReference message) {
-        getNext().messageExpired(context, message);
+    public void messageExpired(ConnectionContext context, MessageReference message, Subscription
subscription) {
+        getNext().messageExpired(context, message, subscription);
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference)
{
-        getNext().sendToDeadLetterQueue(context, messageReference);
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                      Subscription subscription) {
+        getNext().sendToDeadLetterQueue(context, messageReference, subscription);
     }
 
     public Broker getRoot() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Mar 21 23:41:44 2011
@@ -453,7 +453,7 @@ public abstract class PrefetchSubscripti
      * @throws Exception
      */
     protected void sendToDLQ(final ConnectionContext context, final MessageReference node)
throws IOException, Exception {
-        broker.getRoot().sendToDeadLetterQueue(context, node);
+        broker.getRoot().sendToDeadLetterQueue(context, node, this);
     }
     
     public int getInFlightSize() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Mar 21 23:41:44 2011
@@ -539,7 +539,7 @@ public class Queue extends BaseDestinati
                 && !context.isInRecoveryMode();
         if (message.isExpired()) {
             // message not stored - or added to stats yet - so chuck here
-            broker.getRoot().messageExpired(context, message);
+            broker.getRoot().messageExpired(context, message, null);
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                 context.getConnection().dispatchAsync(ack);
@@ -591,7 +591,7 @@ public class Queue extends BaseDestinati
                                     // message may have expired.
                                     if (message.isExpired()) {
                                         LOG.error("expired waiting for space..");
-                                        broker.messageExpired(context, message);
+                                        broker.messageExpired(context, message, null);
                                         destinationStatistics.getExpired().increment();
                                     } else {
                                         doMessageSend(producerExchangeCopy, message);
@@ -644,7 +644,7 @@ public class Queue extends BaseDestinati
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Expired message: " + message);
                         }
-                        broker.getRoot().messageExpired(context, message);
+                        broker.getRoot().messageExpired(context, message, null);
                         return;
                     }
                 }
@@ -699,7 +699,7 @@ public class Queue extends BaseDestinati
                             // It could take while before we receive the commit
                             // op, by that time the message could have expired..
                             if (broker.isExpired(message)) {
-                                broker.messageExpired(context, message);
+                                broker.messageExpired(context, message, null);
                                 destinationStatistics.getExpired().increment();
                                 return;
                             }
@@ -1594,7 +1594,7 @@ public class Queue extends BaseDestinati
         if (LOG.isDebugEnabled()) {
             LOG.debug("message expired: " + reference);
         }
-        broker.messageExpired(context, reference);
+        broker.messageExpired(context, reference, subs);
         destinationStatistics.getExpired().increment();
         try {
             removeMessage(context, subs, (QueueMessageReference) reference);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Mar 21 23:41:44 2011
@@ -806,16 +806,16 @@ public class RegionBroker extends EmptyB
 
     
     @Override
-    public void messageExpired(ConnectionContext context, MessageReference node) {
+    public void messageExpired(ConnectionContext context, MessageReference node, Subscription
subscription) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Message expired " + node);
         }
-        getRoot().sendToDeadLetterQueue(context, node);
+        getRoot().sendToDeadLetterQueue(context, node, null);
     }
     
     @Override
     public void sendToDeadLetterQueue(ConnectionContext context,
-	        MessageReference node){
+	        MessageReference node, Subscription subscription){
 		try{
 			if(node!=null){
 				Message message=node.getMessage();
@@ -838,8 +838,7 @@ public class RegionBroker extends EmptyB
 							// it is only populated if the message is routed to
 							// another destination like the DLQ
 							ActiveMQDestination deadLetterDestination=deadLetterStrategy
-							        .getDeadLetterQueueFor(message
-							                .getDestination());
+							        .getDeadLetterQueueFor(message, subscription);
 							if (context.getBroker()==null) {
 								context.setBroker(getRoot());
 							}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Mar 21 23:41:44 2011
@@ -276,7 +276,7 @@ public class Topic extends BaseDestinati
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if (message.isExpired()) {
-            broker.messageExpired(context, message);
+            broker.messageExpired(context, message, null);
             getDestinationStatistics().getExpired().increment();
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
@@ -322,7 +322,7 @@ public class Topic extends BaseDestinati
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
-                                        broker.messageExpired(context, message);
+                                        broker.messageExpired(context, message, null);
                                         getDestinationStatistics().getExpired().increment();
                                     } else {
                                         doMessageSend(producerExchange, message);
@@ -451,7 +451,7 @@ public class Topic extends BaseDestinati
                     // expired..
                     if (broker.isExpired(message)) {
                         getDestinationStatistics().getExpired().increment();
-                        broker.messageExpired(context, message);
+                        broker.messageExpired(context, message, null);
                         message.decrementReferenceCount();
                         return;
                     }
@@ -644,7 +644,7 @@ public class Topic extends BaseDestinati
     }
 
     public void messageExpired(ConnectionContext context, Subscription subs, MessageReference
reference) {
-        broker.messageExpired(context, reference);
+        broker.messageExpired(context, reference, subs);
         // AMQ-2586: Better to leave this stat at zero than to give the user
         // misleading metrics.
         // destinationStatistics.getMessages().decrement();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Mar 21 23:41:44 2011
@@ -216,7 +216,7 @@ public class TopicSubscription extends A
                     dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
                     node.getRegionDestination().getDestinationStatistics().getExpired().increment();
-                    broker.messageExpired(getContext(), node);
+                    broker.messageExpired(getContext(), node, this);
                     break;
                 }
             }
@@ -543,7 +543,7 @@ public class TopicSubscription extends A
         if (dest != null) {
             dest.messageDiscarded(getContext(), this, message);
         }
-        broker.getRoot().sendToDeadLetterQueue(getContext(), message);
+        broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Mon Mar 21 23:41:44 2011
@@ -453,7 +453,8 @@ public class FilePendingMessageCursor ex
         if (LOG.isDebugEnabled()) {
             LOG.debug("Discarding message " + message);
         }
-        broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()),
message);
+        broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()),
+                message, null);
     }
 
     protected ByteSequence getByteSequence(Message message) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
Mon Mar 21 23:41:44 2011
@@ -17,6 +17,9 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
Mon Mar 21 23:41:44 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 
@@ -35,10 +37,10 @@ public interface DeadLetterStrategy {
     boolean isSendToDeadLetterQueue(Message message);
 
     /**
-     * Returns the dead letter queue for the given destination.
+     * Returns the dead letter queue for the given message and subscription.
      */
-    ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination);
-    
+    ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription);
+
     /**
      * @return true if processes expired messages
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
Mon Mar 21 23:41:44 2011
@@ -16,9 +16,12 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
 
 /**
  * A {@link DeadLetterStrategy} where each destination has its own individual
@@ -33,12 +36,14 @@ public class IndividualDeadLetterStrateg
     private String queuePrefix = "ActiveMQ.DLQ.Queue.";
     private boolean useQueueForQueueMessages = true;
     private boolean useQueueForTopicMessages = true;
+    private boolean destinationPerDurableSubscriber;
 
-    public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination)
{
-        if (originalDestination.isQueue()) {
-            return createDestination(originalDestination, queuePrefix, useQueueForQueueMessages);
+    public ActiveMQDestination getDeadLetterQueueFor(Message message,
+                                                     Subscription subscription) {
+        if (message.getDestination().isQueue()) {
+            return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
         } else {
-            return createDestination(originalDestination, topicPrefix, useQueueForTopicMessages);
+            return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
         }
     }
 
@@ -91,10 +96,30 @@ public class IndividualDeadLetterStrateg
         this.useQueueForTopicMessages = useQueueForTopicMessages;
     }
 
+    public boolean isDestinationPerDurableSubscriber() {
+        return destinationPerDurableSubscriber;
+    }
+
+    /**
+     * sets whether durable topic subscriptions are to get individual dead letter destinations.
+     * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
+     * The default is false.
+     * @param destinationPerDurableSubscriber
+     */
+    public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber)
{
+        this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected ActiveMQDestination createDestination(ActiveMQDestination originalDestination,
String prefix, boolean useQueue) {
-        String name = prefix + originalDestination.getPhysicalName();
+    protected ActiveMQDestination createDestination(Message message,
+                                                    String prefix,
+                                                    boolean useQueue,
+                                                    Subscription subscription ) {
+        String name = prefix + message.getDestination().getPhysicalName();
+        if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription)
{
+            name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
+        }
         if (useQueue) {
             return new ActiveMQQueue(name);
         } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
Mon Mar 21 23:41:44 2011
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
 
 /**
  * A default implementation of {@link DeadLetterStrategy} which uses
@@ -34,7 +36,7 @@ public class SharedDeadLetterStrategy ex
 
     private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
 
-    public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination)
{
+    public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription)
{
         return deadLetterQueue;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
Mon Mar 21 23:41:44 2011
@@ -474,7 +474,7 @@ public class LoggingBrokerPlugin extends
     }
 
     @Override
-    public void messageExpired(ConnectionContext context, MessageReference message) {
+    public void messageExpired(ConnectionContext context, MessageReference message, Subscription
subscription) {
         if (isLogAll() || isLogInternalEvents()) {
             String msg = "Unable to display message.";
 
@@ -482,11 +482,12 @@ public class LoggingBrokerPlugin extends
 
             LOG.info("Message has expired : " + msg);
         }
-        super.messageExpired(context, message);
+        super.messageExpired(context, message, subscription);
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference)
{
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                      Subscription subscription) {
         if (isLogAll() || isLogInternalEvents()) {
             String msg = "Unable to display message.";
 
@@ -494,7 +495,7 @@ public class LoggingBrokerPlugin extends
 
             LOG.info("Sending to DLQ : " + msg);
         }
-        super.sendToDeadLetterQueue(context, messageReference);
+        super.sendToDeadLetterQueue(context, messageReference, subscription);
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
Mon Mar 21 23:41:44 2011
@@ -159,8 +159,8 @@ public class ActiveMQTextMessage extends
     public String toString() {
         try {
             String text = getText();
-            if (text != null && text.length() > 63) {
-                text = text.substring(0, 45) + "..." + text.substring(text.length() - 12);
+            if (text != null) {
+                text = MarshallingSupport.truncate64(text);
                 HashMap<String, Object> overrideFields = new HashMap<String, Object>();
                 overrideFields.put("text", text);
                 return super.toString(overrideFields);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
Mon Mar 21 23:41:44 2011
@@ -21,6 +21,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.slf4j.Logger;
@@ -44,7 +45,8 @@ public class DiscardingDLQBroker extends
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) {
+    public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef,
+                                      Subscription subscription) {
         if (log.isTraceEnabled()) {
             log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef
!= null ? msgRef.getMessage() : null));
         }
@@ -73,7 +75,7 @@ public class DiscardingDLQBroker extends
             skipMessage("dropOnly",msgRef);
         } else {
             dropped = false;
-            next.sendToDeadLetterQueue(ctx, msgRef);
+            next.sendToDeadLetterQueue(ctx, msgRef, subscription);
         }
         if (dropped && getReportInterval()>0) {
             if ((++dropCount)%getReportInterval() == 0 ) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java?rev=1084023&r1=1084022&r2=1084023&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
Mon Mar 21 23:41:44 2011
@@ -66,12 +66,16 @@ public abstract class DeadLetterTestSupp
         broker = createBroker();
         broker.start();
         connection = createConnection();
-        connection.setClientID(toString());
+        connection.setClientID(createClientId());
 
         session = connection.createSession(transactedMode, acknowledgeMode);
         connection.start();
     }
 
+    protected String createClientId() {
+        return toString();
+    }
+
     protected void tearDown() throws Exception {
         if (connection != null) {
             connection.close();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java?rev=1084023&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
Mon Mar 21 23:41:44 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.Enumeration;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * for durable subs, allow a dlq per subscriber such that poison messages are not duplicates
+ * on the dlq and such that rejecting consumers can be identified
+ * https://issues.apache.org/jira/browse/AMQ-3003
+ */
+public class PerDurableConsumerDeadLetterTest extends DeadLetterTest {
+    private static final Logger LOG = LoggerFactory.getLogger(PerDurableConsumerDeadLetterTest.class);
+
+    private static final String CLIENT_ID = "george";
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessNonPersistent(true);
+        strategy.setDestinationPerDurableSubscriber(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    protected String createClientId() {
+        return CLIENT_ID;
+    }
+
+    protected Destination createDlqDestination() {
+        String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
+        String destinationName = prefix + getClass().getName() + "." + getName();
+        if (durableSubscriber) {
+            String subName = // connectionId:SubName
+                CLIENT_ID + ":" + getDestination().toString();
+            destinationName += "." + subName ;
+        }
+        return new ActiveMQQueue(destinationName);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message