activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r668146 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/broker/policy/ test/resources/org/apache/activemq/broker/policy/
Date Mon, 16 Jun 2008 12:57:30 GMT
Author: rajdavies
Date: Mon Jun 16 05:57:29 2008
New Revision: 668146

URL: http://svn.apache.org/viewvc?rev=668146&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1796

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/individual-dlq.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Jun 16 05:57:29 2008
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
@@ -37,6 +38,7 @@
      * from persistent storage
      */
     public static final int DEFAULT_PAGE_SIZE=100;
+   
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
@@ -59,6 +61,7 @@
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
     protected final Broker regionBroker;
+    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
     
     /**
      * @param broker 
@@ -299,6 +302,20 @@
     }
     
     /**
+     * @return the dead letter strategy
+     */
+    public DeadLetterStrategy getDeadLetterStrategy() {
+        return deadLetterStrategy;
+    }
+
+    /**
+     * set the dead letter strategy
+     * @param deadLetterStrategy
+     */
+    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
+        this.deadLetterStrategy = deadLetterStrategy;
+    }
+    /**
      * called when message is consumed
      * @param context
      * @param messageReference

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Mon Jun 16 05:57:29 2008
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -36,6 +37,7 @@
  */
 public interface Destination extends Service, Task {
 
+    public static final DeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new SharedDeadLetterStrategy();
     void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
 
     void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
@@ -114,7 +116,14 @@
      */
     public void setLazyDispatch(boolean value);
 
-    void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference node);
+        
+    /**
+     * Inform the Destination a message has expired
+     * @param context
+     * @param subs 
+     * @param node
+     */
+    void messageExpired(ConnectionContext context, Subscription subs,MessageReference node);
 
     /**
      * called when message is consumed

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Mon Jun 16 05:57:29 2008
@@ -238,4 +238,9 @@
     public void slowConsumer(ConnectionContext context, Subscription subs) {
        next.slowConsumer(context, subs);
     }
+
+   
+    public void messageExpired(ConnectionContext context, Subscription subs,MessageReference
node) {
+       next.messageExpired(context,subs, node);    
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Jun 16 05:57:29 2008
@@ -285,10 +285,7 @@
                 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
                     final MessageReference node = iter.next();
                     if( node.isExpired() ) {
-                        broker.messageExpired(getContext(), node);
                         node.getRegionDestination().messageExpired(context, this, node);
-                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                         dispatched.remove(node);
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
@@ -517,7 +514,6 @@
                                 // Message may have been sitting in the pending
                                 // list a while waiting for the consumer to ak the message.
                                 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired())
{
-                                    broker.messageExpired(getContext(), node);
                                     //increment number to dispatch
                                     numberToDispatch++;
                                     node.getRegionDestination().messageExpired(context, this,
node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Jun 16 05:57:29 2008
@@ -41,10 +41,8 @@
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.broker.region.group.MessageGroupSet;
-import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ExceptionResponse;
@@ -85,7 +83,6 @@
     private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages =
new LinkedHashMap<MessageId,QueueMessageReference>();
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
-    private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private final Object sendLock = new Object();
     private ExecutorService executor;
@@ -163,8 +160,7 @@
                         // Message could have expired while it was being
                         // loaded..
                         if (broker.isExpired(message)) {
-                            broker.messageExpired(createConnectionContext(), message);
-                            destinationStatistics.getMessages().decrement();
+                            messageExpired(createConnectionContext(), message);
                             return true;
                         }
                         if (hasSpace()) {
@@ -328,9 +324,8 @@
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0 && !context.isInRecoveryMode();
         if (message.isExpired()) {
+            //message not stored - or added to stats yet - so chuck here
             broker.getRoot().messageExpired(context, message);
-            //message not added to stats yet
-            //destinationStatistics.getMessages().decrement();
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                 context.getConnection().dispatchAsync(ack);
@@ -357,10 +352,8 @@
     
                                     // While waiting for space to free up... the
                                     // message may have expired.
-                                    if (broker.isExpired(message)) {
+                                    if (message.isExpired()) {
                                         broker.messageExpired(context, message);
-                                        //message not added to stats yet
-                                        //destinationStatistics.getMessages().decrement();
                                     } else {
                                         doMessageSend(producerExchange, message);
                                     }
@@ -570,14 +563,6 @@
         this.dispatchPolicy = dispatchPolicy;
     }
 
-    public DeadLetterStrategy getDeadLetterStrategy() {
-        return deadLetterStrategy;
-    }
-
-    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
-        this.deadLetterStrategy = deadLetterStrategy;
-    }
-
     public MessageGroupMapFactory getMessageGroupMapFactory() {
         return messageGroupMapFactory;
     }
@@ -1005,11 +990,15 @@
     }
 
     protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException
{
+        removeMessage(c, null, r);
+    }
+    
+    protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference
r) throws IOException {
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setDestination(destination);
         ack.setMessageID(r.getMessageId());
-        removeMessage(c, null, r, ack);
+        removeMessage(c, subs, r, ack);
     }
     
     protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference
reference,MessageAck ack) throws IOException {
@@ -1044,11 +1033,19 @@
 
     }
     
-    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference reference) {
-        ((QueueMessageReference)reference).drop();
-        // Not sure.. perhaps we should forge an ack to remove the message from the store.
-        // acknowledge(context, sub, ack, reference);
-        destinationStatistics.getMessages().decrement();
+    public void messageExpired(ConnectionContext context,MessageReference reference) {
+        messageExpired(context,null,reference);
+    }
+    
+    public void messageExpired(ConnectionContext context,Subscription subs, MessageReference
reference) {
+        broker.messageExpired(context, reference);
+        destinationStatistics.getDequeues().increment();
+        destinationStatistics.getInflight().decrement();
+        try {
+            removeMessage(context,subs,(QueueMessageReference)reference);
+        } catch (IOException e) {
+            LOG.error("Failed to remove expired Message from the store ",e);
+        }
         synchronized(pagedInMessages) {
             pagedInMessages.remove(reference.getMessageId());
         }
@@ -1113,9 +1110,7 @@
                                 result.add(ref);
                                 count++;
                             } else {
-                                broker.messageExpired(createConnectionContext(),
-                                        node);
-                                destinationStatistics.getMessages().decrement();
+                                messageExpired(createConnectionContext(), node);
                             }
                         }
                     } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Jun 16 05:57:29 2008
@@ -71,7 +71,6 @@
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
-    private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers
= new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
     private final TaskRunner taskRunner;
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@@ -266,9 +265,8 @@
 
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
-        if (broker.isExpired(message)) {
+        if (message.isExpired()) {
             broker.messageExpired(context, message);
-            destinationStatistics.getMessages().decrement();
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                 context.getConnection().dispatchAsync(ack);
@@ -296,10 +294,8 @@
     
                                     // While waiting for space to free up... the
                                     // message may have expired.
-                                    if (broker.isExpired(message)) {
+                                    if (message.isExpired()) {
                                         broker.messageExpired(context, message);
-                                        //destinationStatistics.getEnqueues().increment();
-                                        //destinationStatistics.getMessages().decrement();
                                     } else {
                                         doMessageSend(producerExchange, message);
                                     }
@@ -413,8 +409,6 @@
                     if (broker.isExpired(message)) {
                         broker.messageExpired(context, message);
                         message.decrementReferenceCount();
-                        //destinationStatistics.getEnqueues().increment();
-                        //destinationStatistics.getMessages().decrement();
                         return;
                     }
                     try {
@@ -555,14 +549,6 @@
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
-    public DeadLetterStrategy getDeadLetterStrategy() {
-        return deadLetterStrategy;
-    }
-
-    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
-        this.deadLetterStrategy = deadLetterStrategy;
-    }
-
     
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -595,6 +581,21 @@
             dispatchValve.decrement();
         }
     }
+    
+    public void messageExpired(ConnectionContext context,Subscription subs, MessageReference
reference) {
+        broker.messageExpired(context, reference);
+        destinationStatistics.getMessages().decrement();
+        destinationStatistics.getEnqueues().decrement();
+        MessageAck ack = new MessageAck();
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setDestination(destination);
+        ack.setMessageID(reference.getMessageId());
+        try {
+            acknowledge(context, subs, ack, reference);
+        } catch (IOException e) {
+            LOG.error("Failed to remove expired Message from the store ",e);
+        }
+    }
 
     /**
      * Provides a hook to allow messages with no consumer to be processed in
@@ -640,10 +641,4 @@
             }
         }
     }
-
-    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference node) {
-        // TODO Auto-generated method stub
-        
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
Mon Jun 16 05:57:29 2008
@@ -38,5 +38,25 @@
      * Returns the dead letter queue for the given destination.
      */
     ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination);
+    
+    /**
+     * @return true if processes expired messages
+     */
+    public boolean isProcessExpired() ;
+
+    /**
+     * @param processExpired the processExpired to set
+     */
+    public void setProcessExpired(boolean processExpired);
+
+    /**
+     * @return the processNonPersistent
+     */
+    public boolean isProcessNonPersistent();
+
+    /**
+     * @param processNonPersistent the processNonPersistent to set
+     */
+    public void setProcessNonPersistent(boolean processNonPersistent);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Jun 16 05:57:29 2008
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
@@ -43,7 +44,7 @@
     private DispatchPolicy dispatchPolicy;
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
-    private DeadLetterStrategy deadLetterStrategy;
+    private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
     private PendingMessageLimitStrategy pendingMessageLimitStrategy;
     private MessageEvictionStrategy messageEvictionStrategy;
     private long memoryLimit;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
Mon Jun 16 05:57:29 2008
@@ -29,6 +29,9 @@
 
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -78,6 +81,14 @@
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
         broker.setPersistent(false);
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
+        if(defaultDeadLetterStrategy!=null) {
+            defaultDeadLetterStrategy.setProcessNonPersistent(true);
+        }
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
         return broker;
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
Mon Jun 16 05:57:29 2008
@@ -19,6 +19,7 @@
 import javax.jms.Destination;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -33,7 +34,9 @@
         BrokerService broker = super.createBroker();
 
         PolicyEntry policy = new PolicyEntry();
-        policy.setDeadLetterStrategy(new IndividualDeadLetterStrategy());
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessNonPersistent(true);
+        policy.setDeadLetterStrategy(strategy);
 
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/individual-dlq.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/individual-dlq.xml?rev=668146&r1=668145&r2=668146&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/individual-dlq.xml
(original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/individual-dlq.xml
Mon Jun 16 05:57:29 2008
@@ -32,7 +32,7 @@
               <strictOrderDispatchPolicy />
             </dispatchPolicy>
             <deadLetterStrategy>
-              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
+              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." processNonPersistent="true"
/>
             </deadLetterStrategy>
           </policyEntry>
 
@@ -41,7 +41,7 @@
               <strictOrderDispatchPolicy />
             </dispatchPolicy>
             <deadLetterStrategy>
-              <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
+              <individualDeadLetterStrategy queuePrefix="Test.DLQ." processNonPersistent="true"/>
             </deadLetterStrategy>
           </policyEntry>
 



Mime
View raw message