activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1480087 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/advisory/ activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/j...
Date Tue, 07 May 2013 21:17:34 GMT
Author: tabish
Date: Tue May  7 21:17:34 2013
New Revision: 1480087

URL: http://svn.apache.org/r1480087
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-4517

Don't send MessageDLQd advisory for Message that aren't sent to a DLQ

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Tue May  7 21:17:34 2013
@@ -408,19 +408,23 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                      Subscription subscription){
-        super.sendToDeadLetterQueue(context, messageReference, subscription);
-        try {
-            if(!messageReference.isAdvisory()) {
-                ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
-                Message payload = messageReference.getMessage().copy();
-                payload.clearBody();
-                fireAdvisory(context, topic,payload);
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                         Subscription subscription) {
+        boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription);
+        if (wasDLQd) {
+            try {
+                if(!messageReference.isAdvisory()) {
+                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
+                    Message payload = messageReference.getMessage().copy();
+                    payload.clearBody();
+                    fireAdvisory(context, topic,payload);
+                }
+            } catch (Exception e) {
+                handleFireFailure("add to DLQ", e);
             }
-        } catch (Exception e) {
-            handleFireFailure("add to DLQ", e);
         }
+
+        return wasDLQd;
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java Tue May  7 21:17:34 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.broker;
 import java.net.URI;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -40,14 +41,14 @@ import org.apache.activemq.usage.Usage;
 /**
  * The Message Broker which routes messages, maintains subscriptions and
  * connections, acknowledges messages and handles transactions.
- * 
- * 
+ *
+ *
  */
 public interface Broker extends Region, Service {
 
     /**
      * Get a Broker from the Broker Stack that is a particular class
-     * 
+     *
      * @param type
      * @return
      */
@@ -70,7 +71,7 @@ public interface Broker extends Region, 
 
     /**
      * Remove a BrokerInfo
-     * 
+     *
      * @param connection
      * @param info
      */
@@ -78,14 +79,14 @@ public interface Broker extends Region, 
 
     /**
      * A client is establishing a connection with the broker.
-     * 
+     *
      * @throws Exception TODO
      */
     void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
 
     /**
      * A client is disconnecting from the broker.
-     * 
+     *
      * @param context the environment the operation is being executed under.
      * @param info
      * @param error null if the client requested the disconnect or the error
@@ -96,7 +97,7 @@ public interface Broker extends Region, 
 
     /**
      * Adds a session.
-     * 
+     *
      * @param context
      * @param info
      * @throws Exception TODO
@@ -105,7 +106,7 @@ public interface Broker extends Region, 
 
     /**
      * Removes a session.
-     * 
+     *
      * @param context
      * @param info
      * @throws Exception TODO
@@ -114,18 +115,20 @@ public interface Broker extends Region, 
 
     /**
      * Adds a producer.
-     * 
+     *
      * @param context the enviorment the operation is being executed under.
      * @throws Exception TODO
      */
+    @Override
     void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
 
     /**
      * Removes a producer.
-     * 
+     *
      * @param context the enviorment the operation is being executed under.
      * @throws Exception TODO
      */
+    @Override
     void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
 
     /**
@@ -142,7 +145,7 @@ public interface Broker extends Region, 
 
     /**
      * Gets a list of all the prepared xa transactions.
-     * 
+     *
      * @param context transaction ids
      * @return
      * @throws Exception TODO
@@ -151,7 +154,7 @@ public interface Broker extends Region, 
 
     /**
      * Starts a transaction.
-     * 
+     *
      * @param context
      * @param xid
      * @throws Exception TODO
@@ -160,7 +163,7 @@ public interface Broker extends Region, 
 
     /**
      * Prepares a transaction. Only valid for xa transactions.
-     * 
+     *
      * @param context
      * @param xid
      * @return id
@@ -170,7 +173,7 @@ public interface Broker extends Region, 
 
     /**
      * Rollsback a transaction.
-     * 
+     *
      * @param context
      * @param xid
      * @throws Exception TODO
@@ -180,7 +183,7 @@ public interface Broker extends Region, 
 
     /**
      * Commits a transaction.
-     * 
+     *
      * @param context
      * @param xid
      * @param onePhase
@@ -190,7 +193,7 @@ public interface Broker extends Region, 
 
     /**
      * Forgets a transaction.
-     * 
+     *
      * @param context
      * @param transactionId
      * @throws Exception
@@ -199,21 +202,21 @@ public interface Broker extends Region, 
 
     /**
      * Get the BrokerInfo's of any connected Brokers
-     * 
+     *
      * @return array of peer BrokerInfos
      */
     BrokerInfo[] getPeerBrokerInfos();
 
     /**
      * Notify the Broker that a dispatch is going to happen
-     * 
+     *
      * @param messageDispatch
      */
     void preProcessDispatch(MessageDispatch messageDispatch);
 
     /**
      * Notify the Broker that a dispatch has happened
-     * 
+     *
      * @param messageDispatch
      */
     void postProcessDispatch(MessageDispatch messageDispatch);
@@ -230,7 +233,7 @@ public interface Broker extends Region, 
 
     /**
      * Add and process a DestinationInfo object
-     * 
+     *
      * @param context
      * @param info
      * @throws Exception
@@ -239,7 +242,7 @@ public interface Broker extends Region, 
 
     /**
      * Remove and process a DestinationInfo object
-     * 
+     *
      * @param context
      * @param info
      * @throws Exception
@@ -260,7 +263,7 @@ public interface Broker extends Region, 
     /**
      * Sets the default administration connection context used when configuring
      * the broker on startup or via JMX
-     * 
+     *
      * @param adminConnectionContext
      */
     void setAdminConnectionContext(ConnectionContext adminConnectionContext);
@@ -287,7 +290,7 @@ public interface Broker extends Region, 
 
     /**
      * Ensure we get the Broker at the top of the Stack
-     * 
+     *
      * @return the broker at the top of the Stack
      */
     Broker getRoot();
@@ -296,7 +299,7 @@ public interface Broker extends Region, 
      * Determine if a message has expired -allows default behaviour to be
      * overriden - as the timestamp set by the producer can be out of sync with
      * the broker
-     * 
+     *
      * @param messageReference
      * @return true if the message is expired
      */
@@ -313,49 +316,51 @@ public interface Broker extends Region, 
 
     /**
      * A message needs to go the a DLQ
-     * 
+     *
      * @param context
      * @param messageReference
      * @param subscription, may be null
+     *
+     * @return true if Message was placed in a DLQ false if discarded.
      */
-    void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
-    
+    boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
+
     /**
      * @return the broker sequence id
      */
     long getBrokerSequenceId();
-    
+
     /**
      * called when message is consumed
      * @param context
      * @param messageReference
      */
     void messageConsumed(ConnectionContext context, MessageReference messageReference);
-    
+
     /**
      * Called when message is delivered to the broker
      * @param context
      * @param messageReference
      */
     void messageDelivered(ConnectionContext context, MessageReference messageReference);
-    
+
     /**
      * Called when a message is discarded - e.g. running low on memory
      * This will happen only if the policy is enabled - e.g. non durable topics
      * @param context
-     * @param sub 
+     * @param sub
      * @param messageReference
      */
     void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
-    
+
     /**
      * Called when there is a slow consumer
      * @param context
-     * @param destination 
+     * @param destination
      * @param subs
      */
     void slowConsumer(ConnectionContext context,Destination destination, Subscription subs);
-    
+
     /**
      * Called to notify a producer is too fast
      * @param context
@@ -363,23 +368,23 @@ public interface Broker extends Region, 
      * @param destination
      */
     void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination);
-    
+
     /**
      * Called when a Usage reaches a limit
      * @param context
-     * @param destination 
+     * @param destination
      * @param usage
      */
     void isFull(ConnectionContext context,Destination destination,Usage usage);
-    
+
     /**
      *  called when the broker becomes the master in a master/slave
      *  configuration
      */
     void nowMasterBroker();
-    
+
     Scheduler getScheduler();
-    
+
     ThreadPoolExecutor getExecutor();
 
     void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java Tue May  7 21:17:34 2013
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -47,8 +48,8 @@ import org.apache.activemq.usage.Usage;
 /**
  * Allows you to intercept broker operation so that features such as security
  * can be implemented as a pluggable filter.
- * 
- * 
+ *
+ *
  */
 public class BrokerFilter implements Broker {
 
@@ -58,6 +59,7 @@ public class BrokerFilter implements Bro
         this.next = next;
     }
 
+    @Override
     public Broker getAdaptor(Class type) {
         if (type.isInstance(this)) {
             return this;
@@ -65,257 +67,320 @@ public class BrokerFilter implements Bro
         return next.getAdaptor(type);
     }
 
+    @Override
     public Map<ActiveMQDestination, Destination> getDestinationMap() {
         return next.getDestinationMap();
     }
 
+    @Override
     public Set <Destination>getDestinations(ActiveMQDestination destination) {
         return next.getDestinations(destination);
     }
 
+    @Override
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         next.acknowledge(consumerExchange, ack);
     }
 
+    @Override
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
         return next.messagePull(context, pull);
     }
 
+    @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
         next.addConnection(context, info);
     }
 
+    @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         return next.addConsumer(context, info);
     }
 
+    @Override
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         next.addProducer(context, info);
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
         next.commitTransaction(context, xid, onePhase);
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
         next.removeSubscription(context, info);
     }
 
+    @Override
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
         return next.getPreparedTransactions(context);
     }
 
+    @Override
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         return next.prepareTransaction(context, xid);
     }
 
+    @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
         next.removeConnection(context, info, error);
     }
 
+    @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         next.removeConsumer(context, info);
     }
 
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         next.removeProducer(context, info);
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         next.rollbackTransaction(context, xid);
     }
 
+    @Override
     public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
         next.send(producerExchange, messageSend);
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         next.beginTransaction(context, xid);
     }
 
+    @Override
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
         next.forgetTransaction(context, transactionId);
     }
 
+    @Override
     public Connection[] getClients() throws Exception {
         return next.getClients();
     }
 
+    @Override
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
         return next.addDestination(context, destination,createIfTemporary);
     }
 
+    @Override
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
         next.removeDestination(context, destination, timeout);
     }
 
+    @Override
     public ActiveMQDestination[] getDestinations() throws Exception {
         return next.getDestinations();
     }
 
+    @Override
     public void start() throws Exception {
         next.start();
     }
 
+    @Override
     public void stop() throws Exception {
         next.stop();
     }
 
+    @Override
     public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
         next.addSession(context, info);
     }
 
+    @Override
     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
         next.removeSession(context, info);
     }
 
+    @Override
     public BrokerId getBrokerId() {
         return next.getBrokerId();
     }
 
+    @Override
     public String getBrokerName() {
         return next.getBrokerName();
     }
 
+    @Override
     public void gc() {
         next.gc();
     }
 
+    @Override
     public void addBroker(Connection connection, BrokerInfo info) {
         next.addBroker(connection, info);
     }
 
+    @Override
     public void removeBroker(Connection connection, BrokerInfo info) {
         next.removeBroker(connection, info);
     }
 
+    @Override
     public BrokerInfo[] getPeerBrokerInfos() {
         return next.getPeerBrokerInfos();
     }
 
+    @Override
     public void preProcessDispatch(MessageDispatch messageDispatch) {
         next.preProcessDispatch(messageDispatch);
     }
 
+    @Override
     public void postProcessDispatch(MessageDispatch messageDispatch) {
         next.postProcessDispatch(messageDispatch);
     }
 
+    @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
         next.processDispatchNotification(messageDispatchNotification);
     }
 
+    @Override
     public boolean isStopped() {
         return next.isStopped();
     }
 
+    @Override
     public Set<ActiveMQDestination> getDurableDestinations() {
         return next.getDurableDestinations();
     }
 
+    @Override
     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         next.addDestinationInfo(context, info);
     }
 
+    @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         next.removeDestinationInfo(context, info);
     }
 
+    @Override
     public boolean isFaultTolerantConfiguration() {
         return next.isFaultTolerantConfiguration();
     }
 
+    @Override
     public ConnectionContext getAdminConnectionContext() {
         return next.getAdminConnectionContext();
     }
 
+    @Override
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         next.setAdminConnectionContext(adminConnectionContext);
     }
 
+    @Override
     public PListStore getTempDataStore() {
         return next.getTempDataStore();
     }
 
+    @Override
     public URI getVmConnectorURI() {
         return next.getVmConnectorURI();
     }
 
+    @Override
     public void brokerServiceStarted() {
         next.brokerServiceStarted();
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return next.getBrokerService();
     }
 
+    @Override
     public boolean isExpired(MessageReference messageReference) {
         return next.isExpired(messageReference);
     }
 
+    @Override
     public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
         next.messageExpired(context, message, subscription);
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+    @Override
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
                                       Subscription subscription) {
-        next.sendToDeadLetterQueue(context, messageReference, subscription);
+        return next.sendToDeadLetterQueue(context, messageReference, subscription);
     }
 
+    @Override
     public Broker getRoot() {
         return next.getRoot();
     }
 
+    @Override
     public long getBrokerSequenceId() {
         return next.getBrokerSequenceId();
     }
 
-   
+
+    @Override
     public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
         next.fastProducer(context, producerInfo, destination);
     }
 
+    @Override
     public void isFull(ConnectionContext context,Destination destination, Usage usage) {
         next.isFull(context,destination, usage);
     }
 
+    @Override
     public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
         next.messageConsumed(context, messageReference);
     }
 
+    @Override
     public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
         next.messageDelivered(context, messageReference);
     }
 
+    @Override
     public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) {
         next.messageDiscarded(context, sub, messageReference);
     }
 
+    @Override
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
         next.slowConsumer(context, destination,subs);
     }
-    
-    public void nowMasterBroker() {   
+
+    @Override
+    public void nowMasterBroker() {
         next.nowMasterBroker();
     }
 
+    @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
             ConsumerControl control) {
         next.processConsumerControl(consumerExchange, control);
     }
 
+    @Override
     public Scheduler getScheduler() {
        return next.getScheduler();
     }
 
+    @Override
     public ThreadPoolExecutor getExecutor() {
        return next.getExecutor();
     }
 
+    @Override
     public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
         next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
     }
 
+    @Override
     public void networkBridgeStopped(BrokerInfo brokerInfo) {
         next.networkBridgeStopped(brokerInfo);
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java Tue May  7 21:17:34 2013
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -47,19 +48,22 @@ import org.apache.activemq.usage.Usage;
 
 /**
  * Dumb implementation - used to be overriden by listeners
- * 
- * 
+ *
+ *
  */
 public class EmptyBroker implements Broker {
 
+    @Override
     public BrokerId getBrokerId() {
         return null;
     }
 
+    @Override
     public String getBrokerName() {
         return null;
     }
 
+    @Override
     public Broker getAdaptor(Class type) {
         if (type.isInstance(this)) {
             return this;
@@ -67,237 +71,298 @@ public class EmptyBroker implements Brok
         return null;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public Map<ActiveMQDestination, Destination> getDestinationMap() {
         return Collections.EMPTY_MAP;
     }
 
+    @Override
     public Set getDestinations(ActiveMQDestination destination) {
         return Collections.EMPTY_SET;
     }
 
+    @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
 
     }
 
+    @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
 
     }
 
+    @Override
     public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
 
     }
 
+    @Override
     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
 
     }
 
+    @Override
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
 
     }
 
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
 
     }
 
+    @Override
     public Connection[] getClients() throws Exception {
 
         return null;
     }
 
+    @Override
     public ActiveMQDestination[] getDestinations() throws Exception {
 
         return null;
     }
 
+    @Override
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
 
         return null;
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
 
     }
 
+    @Override
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
 
         return 0;
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
 
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
 
     }
 
+    @Override
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
 
     }
 
+    @Override
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception {
 
         return null;
     }
 
+    @Override
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
 
     }
 
+    @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         return null;
     }
 
+    @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
 
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
 
     }
 
+    @Override
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
 
     }
 
+    @Override
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
 
     }
 
+    @Override
     public void gc() {
 
     }
 
+    @Override
     public void start() throws Exception {
 
     }
 
+    @Override
     public void stop() throws Exception {
 
     }
 
+    @Override
     public void addBroker(Connection connection, BrokerInfo info) {
 
     }
 
+    @Override
     public void removeBroker(Connection connection, BrokerInfo info) {
 
     }
 
+    @Override
     public BrokerInfo[] getPeerBrokerInfos() {
         return null;
     }
 
+    @Override
     public void preProcessDispatch(MessageDispatch messageDispatch) {
     }
 
+    @Override
     public void postProcessDispatch(MessageDispatch messageDispatch) {
     }
 
+    @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
 
     }
 
+    @Override
     public boolean isStopped() {
         return false;
     }
 
+    @Override
     public Set<ActiveMQDestination> getDurableDestinations() {
         return null;
     }
 
+    @Override
     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
     }
 
+    @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
     }
 
+    @Override
     public boolean isFaultTolerantConfiguration() {
         return false;
     }
 
+    @Override
     public ConnectionContext getAdminConnectionContext() {
         return null;
     }
 
+    @Override
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
     }
 
+    @Override
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
         return null;
     }
 
+    @Override
     public PListStore getTempDataStore() {
         return null;
     }
 
+    @Override
     public URI getVmConnectorURI() {
         return null;
     }
 
+    @Override
     public void brokerServiceStarted() {
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return null;
     }
 
+    @Override
     public boolean isExpired(MessageReference messageReference) {
         return false;
     }
 
+    @Override
     public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context,
-                                      MessageReference messageReference,
-                                      Subscription subscription) {
+    @Override
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                         Subscription subscription) {
+        return false;
     }
 
+    @Override
     public Broker getRoot() {
         return null;
     }
-    
+
+    @Override
     public long getBrokerSequenceId() {
         return -1l;
     }
-    
+
+    @Override
     public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
     }
 
+    @Override
     public void isFull(ConnectionContext context, Destination destination,Usage usage) {
     }
 
+    @Override
     public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
     }
 
+    @Override
     public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
     }
 
+    @Override
     public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
     }
 
+    @Override
     public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
     }
 
-    public void nowMasterBroker() {        
+    @Override
+    public void nowMasterBroker() {
     }
 
+    @Override
     public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
     }
 
+    @Override
     public void networkBridgeStopped(BrokerInfo brokerInfo) {
     }
 
+    @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
-            ConsumerControl control) {     
+            ConsumerControl control) {
     }
 
+    @Override
     public Scheduler getScheduler() {
         return null;
     }
 
+    @Override
     public ThreadPoolExecutor getExecutor() {
         return null;
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java Tue May  7 21:17:34 2013
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -48,8 +49,8 @@ import org.apache.activemq.usage.Usage;
 /**
  * Implementation of the broker where all it's methods throw an
  * BrokerStoppedException.
- * 
- * 
+ *
+ *
  */
 public class ErrorBroker implements Broker {
 
@@ -59,15 +60,18 @@ public class ErrorBroker implements Brok
         this.message = message;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public Map<ActiveMQDestination, Destination> getDestinationMap() {
         return Collections.EMPTY_MAP;
     }
 
+    @Override
     public Set getDestinations(ActiveMQDestination destination) {
         return Collections.EMPTY_SET;
     }
 
+    @Override
     public Broker getAdaptor(Class type) {
         if (type.isInstance(this)) {
             return this;
@@ -75,249 +79,310 @@ public class ErrorBroker implements Brok
         return null;
     }
 
+    @Override
     public BrokerId getBrokerId() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public String getBrokerName() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public Connection[] getClients() throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public ActiveMQDestination[] getDestinations() throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void gc() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void start() throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void stop() throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void addBroker(Connection connection, BrokerInfo info) {
         throw new BrokerStoppedException(this.message);
 
     }
 
+    @Override
     public void removeBroker(Connection connection, BrokerInfo info) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public BrokerInfo[] getPeerBrokerInfos() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void preProcessDispatch(MessageDispatch messageDispatch) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void postProcessDispatch(MessageDispatch messageDispatch) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public boolean isStopped() {
         return true;
     }
 
+    @Override
     public Set<ActiveMQDestination> getDurableDestinations() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public boolean isFaultTolerantConfiguration() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public ConnectionContext getAdminConnectionContext() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public Response messagePull(ConnectionContext context, MessagePull pull) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public PListStore getTempDataStore() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public URI getVmConnectorURI() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void brokerServiceStarted() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public BrokerService getBrokerService() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public boolean isExpired(MessageReference messageReference) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
         throw new BrokerStoppedException(this.message);
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                      Subscription subscription) {
+    @Override
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                         Subscription subscription) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public Broker getRoot() {
         throw new BrokerStoppedException(this.message);
     }
-    
+
+    @Override
     public long getBrokerSequenceId() {
         throw new BrokerStoppedException(this.message);
     }
-    
+
+    @Override
     public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void isFull(ConnectionContext context,Destination destination, Usage usage) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
         throw new BrokerStoppedException(this.message);
     }
-    
-    public void nowMasterBroker() {   
+
+    @Override
+    public void nowMasterBroker() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
             ConsumerControl control) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public Scheduler getScheduler() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public ThreadPoolExecutor getExecutor() {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
     public void networkBridgeStopped(BrokerInfo brokerInfo) {
         throw new BrokerStoppedException(this.message);
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Tue May  7 21:17:34 2013
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -49,8 +50,8 @@ import org.apache.activemq.usage.Usage;
  * Like a BrokerFilter but it allows you to switch the getNext().broker. This
  * has more overhead than a BrokerFilter since access to the getNext().broker
  * has to synchronized since it is mutable
- * 
- * 
+ *
+ *
  */
 public class MutableBrokerFilter implements Broker {
 
@@ -60,6 +61,7 @@ public class MutableBrokerFilter impleme
         this.next.set(next);
     }
 
+    @Override
     public Broker getAdaptor(Class type) {
         if (type.isInstance(this)) {
             return this;
@@ -72,261 +74,324 @@ public class MutableBrokerFilter impleme
     }
 
     public void setNext(Broker next) {
-    	this.next.set(next);
+        this.next.set(next);
     }
 
+    @Override
     public Map<ActiveMQDestination, Destination> getDestinationMap() {
         return getNext().getDestinationMap();
     }
 
+    @Override
     public Set getDestinations(ActiveMQDestination destination) {
         return getNext().getDestinations(destination);
     }
 
+    @Override
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         getNext().acknowledge(consumerExchange, ack);
     }
 
+    @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
         getNext().addConnection(context, info);
     }
 
+    @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         return getNext().addConsumer(context, info);
     }
 
+    @Override
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         getNext().addProducer(context, info);
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
         getNext().commitTransaction(context, xid, onePhase);
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
         getNext().removeSubscription(context, info);
     }
 
+    @Override
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
         return getNext().getPreparedTransactions(context);
     }
 
+    @Override
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         return getNext().prepareTransaction(context, xid);
     }
 
+    @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
         getNext().removeConnection(context, info, error);
     }
 
+    @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         getNext().removeConsumer(context, info);
     }
 
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         getNext().removeProducer(context, info);
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         getNext().rollbackTransaction(context, xid);
     }
 
+    @Override
     public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
         getNext().send(producerExchange, messageSend);
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         getNext().beginTransaction(context, xid);
     }
 
+    @Override
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
         getNext().forgetTransaction(context, transactionId);
     }
 
+    @Override
     public Connection[] getClients() throws Exception {
         return getNext().getClients();
     }
 
+    @Override
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
         return getNext().addDestination(context, destination,createIfTemporary);
     }
 
+    @Override
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
         getNext().removeDestination(context, destination, timeout);
     }
 
+    @Override
     public ActiveMQDestination[] getDestinations() throws Exception {
         return getNext().getDestinations();
     }
 
+    @Override
     public void start() throws Exception {
         getNext().start();
     }
 
+    @Override
     public void stop() throws Exception {
         getNext().stop();
     }
 
+    @Override
     public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
         getNext().addSession(context, info);
     }
 
+    @Override
     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
         getNext().removeSession(context, info);
     }
 
+    @Override
     public BrokerId getBrokerId() {
         return getNext().getBrokerId();
     }
 
+    @Override
     public String getBrokerName() {
         return getNext().getBrokerName();
     }
 
+    @Override
     public void gc() {
         getNext().gc();
     }
 
+    @Override
     public void addBroker(Connection connection, BrokerInfo info) {
         getNext().addBroker(connection, info);
     }
 
+    @Override
     public void removeBroker(Connection connection, BrokerInfo info) {
         getNext().removeBroker(connection, info);
     }
 
+    @Override
     public BrokerInfo[] getPeerBrokerInfos() {
         return getNext().getPeerBrokerInfos();
     }
 
+    @Override
     public void preProcessDispatch(MessageDispatch messageDispatch) {
         getNext().preProcessDispatch(messageDispatch);
     }
 
+    @Override
     public void postProcessDispatch(MessageDispatch messageDispatch) {
         getNext().postProcessDispatch(messageDispatch);
     }
 
+    @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
         getNext().processDispatchNotification(messageDispatchNotification);
     }
 
+    @Override
     public boolean isStopped() {
         return getNext().isStopped();
     }
 
+    @Override
     public Set<ActiveMQDestination> getDurableDestinations() {
         return getNext().getDurableDestinations();
     }
 
+    @Override
     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         getNext().addDestinationInfo(context, info);
 
     }
 
+    @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         getNext().removeDestinationInfo(context, info);
 
     }
 
+    @Override
     public boolean isFaultTolerantConfiguration() {
         return getNext().isFaultTolerantConfiguration();
     }
 
+    @Override
     public ConnectionContext getAdminConnectionContext() {
         return getNext().getAdminConnectionContext();
     }
 
+    @Override
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         getNext().setAdminConnectionContext(adminConnectionContext);
     }
 
+    @Override
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
         return getNext().messagePull(context, pull);
     }
 
+    @Override
     public PListStore getTempDataStore() {
         return getNext().getTempDataStore();
     }
 
+    @Override
     public URI getVmConnectorURI() {
         return getNext().getVmConnectorURI();
     }
 
+    @Override
     public void brokerServiceStarted() {
         getNext().brokerServiceStarted();
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return getNext().getBrokerService();
     }
 
+    @Override
     public boolean isExpired(MessageReference messageReference) {
         return getNext().isExpired(messageReference);
     }
 
+    @Override
     public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
         getNext().messageExpired(context, message, subscription);
     }
 
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                      Subscription subscription) {
-        getNext().sendToDeadLetterQueue(context, messageReference, subscription);
+    @Override
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+                                         Subscription subscription) {
+        return getNext().sendToDeadLetterQueue(context, messageReference, subscription);
     }
 
+    @Override
     public Broker getRoot() {
         return getNext().getRoot();
     }
-    
+
+    @Override
     public long getBrokerSequenceId() {
         return getNext().getBrokerSequenceId();
     }
-    
+
+    @Override
     public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
         getNext().fastProducer(context, producerInfo, destination);
     }
 
+    @Override
     public void isFull(ConnectionContext context,Destination destination, Usage usage) {
         getNext().isFull(context,destination, usage);
     }
 
+    @Override
     public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
         getNext().messageConsumed(context, messageReference);
     }
 
+    @Override
     public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
         getNext().messageDelivered(context, messageReference);
     }
 
+    @Override
     public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
         getNext().messageDiscarded(context, sub, messageReference);
     }
 
+    @Override
     public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
         getNext().slowConsumer(context, dest,subs);
     }
-    
-    public void nowMasterBroker() {   
+
+    @Override
+    public void nowMasterBroker() {
        getNext().nowMasterBroker();
     }
 
+    @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
             ConsumerControl control) {
         getNext().processConsumerControl(consumerExchange, control);
     }
 
+    @Override
     public Scheduler getScheduler() {
        return getNext().getScheduler();
     }
 
+    @Override
     public ThreadPoolExecutor getExecutor() {
        return getNext().getExecutor();
     }
 
+    @Override
     public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
         getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
     }
 
+    @Override
     public void networkBridgeStopped(BrokerInfo brokerInfo) {
         getNext().networkBridgeStopped(brokerInfo);
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue May  7 21:17:34 2013
@@ -701,7 +701,7 @@ public class RegionBroker extends EmptyB
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
         try {
             if (node != null) {
                 Message message = node.getMessage();
@@ -726,6 +726,7 @@ public class RegionBroker extends EmptyB
                                 context.setBroker(getRoot());
                             }
                             BrokerSupport.resendNoCopy(context, message, deadLetterDestination);
+                            return true;
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {
@@ -738,6 +739,8 @@ public class RegionBroker extends EmptyB
         } catch (Exception e) {
             LOG.warn("Caught an exception sending to DLQ: " + node, e);
         }
+
+        return false;
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java Tue May  7 21:17:34 2013
@@ -17,7 +17,9 @@
 package org.apache.activemq.broker.util;
 
 import java.util.Set;
+
 import javax.annotation.PostConstruct;
+
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
@@ -497,7 +499,7 @@ public class LoggingBrokerPlugin extends
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
                                       Subscription subscription) {
         if (isLogAll() || isLogInternalEvents()) {
             String msg = "Unable to display message.";
@@ -506,7 +508,7 @@ public class LoggingBrokerPlugin extends
 
             LOG.info("Sending to DLQ : " + msg);
         }
-        super.sendToDeadLetterQueue(context, messageReference, subscription);
+        return super.sendToDeadLetterQueue(context, messageReference, subscription);
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java Tue May  7 21:17:34 2013
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.util;
 
 import java.io.IOException;
+
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.broker.Broker;
@@ -126,10 +127,10 @@ public class RedeliveryPlugin extends Br
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
         if (messageReference.isExpired()) {
             // there are two uses of  sendToDeadLetterQueue, we are only interested in valid messages
-            super.sendToDeadLetterQueue(context, messageReference, subscription);
+            return super.sendToDeadLetterQueue(context, messageReference, subscription);
         } else {
             try {
                 Destination regionDestination = (Destination) messageReference.getRegionDestination();
@@ -145,15 +146,17 @@ public class RedeliveryPlugin extends Br
 
                         scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
                     } else if (isSendToDlqIfMaxRetriesExceeded()) {
-                        super.sendToDeadLetterQueue(context, messageReference, subscription);
+                        return super.sendToDeadLetterQueue(context, messageReference, subscription);
                     } else {
                         LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId());
                     }
                 } else if (isFallbackToDeadLetter()) {
-                    super.sendToDeadLetterQueue(context, messageReference, subscription);
+                    return super.sendToDeadLetterQueue(context, messageReference, subscription);
                 } else {
                     LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + regionDestination.getActiveMQDestination());
                 }
+
+                return false;
             } catch (Exception exception) {
                 // abort the ack, will be effective if client use transactions or individual ack with sync send
                 RuntimeException toThrow =  new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
@@ -203,5 +206,4 @@ public class RedeliveryPlugin extends Br
         }
         return 0;
     }
-
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java?rev=1480087&r1=1480086&r2=1480087&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java Tue May  7 21:17:34 2013
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package org.apache.activemq.plugin;
+package org.apache.activemq.plugin;
 
 import java.util.regex.Pattern;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
@@ -28,8 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author Filip Hanik
- * @version 1.0
  */
 public class DiscardingDLQBroker extends BrokerFilter {
     public static Logger log = LoggerFactory.getLogger(DiscardingDLQBroker.class);
@@ -45,8 +44,7 @@ public class DiscardingDLQBroker extends
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef,
-                                      Subscription subscription) {
+    public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription) {
         if (log.isTraceEnabled()) {
             log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
         }
@@ -58,35 +56,38 @@ public class DiscardingDLQBroker extends
         dest = msg.getDestination();
         destName = dest.getPhysicalName();
 
-        if (dest == null || destName == null ) {
-            //do nothing, no need to forward it
-            skipMessage("NULL DESTINATION",msgRef);
+        if (dest == null || destName == null) {
+            // do nothing, no need to forward it
+            skipMessage("NULL DESTINATION", msgRef);
         } else if (dropAll) {
-            //do nothing
-            skipMessage("dropAll",msgRef);
+            // do nothing
+            skipMessage("dropAll", msgRef);
         } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) {
-            //do nothing
-            skipMessage("dropTemporaryTopics",msgRef);
+            // do nothing
+            skipMessage("dropTemporaryTopics", msgRef);
         } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) {
-            //do nothing
-            skipMessage("dropTemporaryQueues",msgRef);
-        } else if (destFilter!=null && matches(destName)) {
-            //do nothing
-            skipMessage("dropOnly",msgRef);
+            // do nothing
+            skipMessage("dropTemporaryQueues", msgRef);
+        } else if (destFilter != null && matches(destName)) {
+            // do nothing
+            skipMessage("dropOnly", msgRef);
         } else {
             dropped = false;
-            next.sendToDeadLetterQueue(ctx, msgRef, subscription);
+            return next.sendToDeadLetterQueue(ctx, msgRef, subscription);
         }
-        if (dropped && getReportInterval()>0) {
-            if ((++dropCount)%getReportInterval() == 0 ) {
-                log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue");
+
+        if (dropped && getReportInterval() > 0) {
+            if ((++dropCount) % getReportInterval() == 0) {
+                log.info("Total of " + dropCount + " messages were discarded, since their destination was the dead letter queue");
             }
         }
+
+        return false;
     }
 
     public boolean matches(String destName) {
-        for (int i=0; destFilter!=null && i<destFilter.length; i++) {
-            if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) {
+        for (int i = 0; destFilter != null && i < destFilter.length; i++) {
+            if (destFilter[i] != null && destFilter[i].matcher(destName).matches()) {
                 return true;
             }
         }
@@ -95,7 +96,7 @@ public class DiscardingDLQBroker extends
 
     private void skipMessage(String prefix, MessageReference msgRef) {
         if (log.isDebugEnabled()) {
-            String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
+            String lmsg = "Discarding DLQ BrokerFilter[" + prefix + "] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null);
             log.debug(lmsg);
         }
     }
@@ -139,5 +140,4 @@ public class DiscardingDLQBroker extends
     public int getReportInterval() {
         return reportInterval;
     }
-
 }

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java?rev=1480087&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java Tue May  7 21:17:34 2013
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4517Test {
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+
+        connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(false);
+        strategy.setProcessExpired(false);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout=360000)
+    public void test() throws Exception {
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+
+        final AtomicBoolean advised = new AtomicBoolean(false);
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dlqDestination = session.createTopic(AdvisorySupport.MESSAGE_DLQ_TOPIC_PREFIX + ">");
+        MessageConsumer consumer = session.createConsumer(dlqDestination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                advised.set(true);
+            }
+        });
+        connection.start();
+
+        ExecutorService service = Executors.newSingleThreadExecutor();
+
+        service.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = session.createTemporaryQueue();
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.setTimeToLive(400);
+                    producer.send(session.createTextMessage());
+                    producer.send(session.createTextMessage());
+                    TimeUnit.MILLISECONDS.sleep(500);
+                    connection.close();
+                } catch (Exception e) {
+                }
+            }
+        });
+
+        service.shutdown();
+        assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
+        assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get());
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message