qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1207029 [1/3] - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/se...
Date Mon, 28 Nov 2011 09:19:27 GMT
Author: kwall
Date: Mon Nov 28 09:19:15 2011
New Revision: 1207029

URL: http://svn.apache.org/viewvc?rev=1207029&view=rev
Log:
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange

Applied patch from  Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>

Added:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
    qpid/trunk/qpid/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Mon Nov 28 09:19:15 2011
@@ -20,8 +20,8 @@ package org.apache.qpid.server;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -243,7 +244,13 @@ public class AMQBrokerManagerMBean exten
      */
     public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException
     {
-        AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
+        createNewQueue(queueName, owner, durable, null);
+    }
+
+    public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException
+    {
+        final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName);
+        AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
         if (queue != null)
         {
             throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -258,11 +265,18 @@ public class AMQBrokerManagerMBean exten
                 ownerShortString = new AMQShortString(owner);
             }
 
+            FieldTable args = null;
+            if(arguments != null)
+            {
+                args = FieldTable.convertToFieldTable(arguments);
+            }
             final VirtualHost virtualHost = getVirtualHost();
-            queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, virtualHost, null);
+
+            queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString,
+                                                       false, false, getVirtualHost(), args);
             if (queue.isDurable() && !queue.isAutoDelete())
             {
-                _durableConfig.createQueue(queue);
+                _durableConfig.createQueue(queue, args);
             }
 
             virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Nov 28 09:19:15 2011
@@ -22,7 +22,6 @@ package org.apache.qpid.server;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQMethodBody;
@@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -692,6 +693,31 @@ public class AMQChannel implements Sessi
 
     }
 
+    public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
+    {
+        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        if (queueEntry != null)
+        {
+            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            return maximumDeliveryCount > 0;
+        }
+
+        return false;
+    }
+
+    public boolean isDeliveredTooManyTimes(final long deliveryTag)
+    {
+        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        if (queueEntry != null)
+        {
+            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int numDeliveries = queueEntry.getDeliveryCount();
+            return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
+        }
+
+        return false;
+    }
+
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
@@ -739,9 +765,9 @@ public class AMQChannel implements Sessi
             QueueEntry message = entry.getValue();
             long deliveryTag = entry.getKey();
 
+            //Amend the delivery counter as the client hasn't seen these messages yet.
+            message.decrementDeliveryCount();
 
-
-            ServerMessage msg = message.getMessage();
             AMQQueue queue = message.getQueue();
 
             // Our Java Client will always suspend the channel when resending!
@@ -799,6 +825,10 @@ public class AMQChannel implements Sessi
         {
             QueueEntry message = entry.getValue();
             long deliveryTag = entry.getKey();
+
+            //Amend the delivery counter as the client hasn't seen these messages yet.
+            message.decrementDeliveryCount();
+
             _unacknowledgedMessageMap.remove(deliveryTag);
 
             message.setRedelivered();
@@ -1058,6 +1088,7 @@ public class AMQChannel implements Sessi
                 _session.registerMessageDelivered(entry.getMessage().getSize());
                 getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
                                                                                deliveryTag, sub.getConsumerTag());
+                entry.incrementDeliveryCount();
             }
 
         };
@@ -1246,7 +1277,6 @@ public class AMQChannel implements Sessi
     {
         private final Collection<QueueEntry> _ackedMessages;
 
-
         public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
         {
             _ackedMessages = ackedMessages;
@@ -1479,4 +1509,54 @@ public class AMQChannel implements Sessi
             }
         }
     }
+
+    public void deadLetter(long deliveryTag) throws AMQException
+    {
+        final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+        final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+        if (rejectedQueueEntry == null)
+        {
+            _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
+            return;
+        }
+        else
+        {
+            final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+            final AMQQueue queue = rejectedQueueEntry.getQueue();
+
+            final Exchange altExchange = queue.getAlternateExchange();
+            unackedMap.remove(deliveryTag);
+
+            if (altExchange == null)
+            {
+                _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+                _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+                rejectedQueueEntry.discard();
+                return;
+            }
+
+            final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
+
+            final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+
+            if (destinationQueues == null || destinationQueues.isEmpty())
+            {
+                _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
+                _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                rejectedQueueEntry.discard();
+                return;
+            }
+
+            rejectedQueueEntry.routeToAlternate();
+
+            //output operational logging for each delivery post commit
+            for (final BaseQueue destinationQueue : destinationQueues)
+            {
+                _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+            }
+
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Mon Nov 28 09:19:15 2011
@@ -63,7 +63,9 @@ public class QueueConfiguration extends 
                             "flowResumeCapacity",
                             "lvq",
                             "lvqKey",
-                            "sortKey"
+                            "sortKey",
+                            "maximumDeliveryCount",
+                            "deadLetterQueues"
         };
     }
 
@@ -173,6 +175,19 @@ public class QueueConfiguration extends 
         return getStringValue("sortKey", null);
     }
 
+    public int getMaxDeliveryCount()
+    {
+        return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount());
+    }
+
+    /**
+     * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set.
+     */
+    public boolean isDeadLetterQueueEnabled()
+    {
+        return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
+    }
+
     public static class QueueConfig extends ConfigurationPlugin
     {
         @Override

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Mon Nov 28 09:19:15 2011
@@ -40,6 +40,8 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.signal.SignalHandlerTask;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -810,4 +812,34 @@ public class ServerConfiguration extends
     {
         return getBooleanValue("management.managementRightsInferAllAccess", true);
     }
+
+    public int getMaxDeliveryCount()
+    {
+        return getConfig().getInt("maximumDeliveryCount", 0);
+    }
+
+    /**
+     * Check if dead letter queue delivery is enabled, defaults to disabled if not set.
+     */
+    public boolean isDeadLetterQueueEnabled()
+    {
+        return getConfig().getBoolean("deadLetterQueues", false);
+    }
+
+    /**
+     * String to affix to end of queue name when generating an alternate exchange for DLQ purposes.
+     */
+    public String getDeadLetterExchangeSuffix()
+    {
+        return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+    }
+
+    /**
+     * String to affix to end of queue name when generating a queue for DLQ purposes.
+     */
+    public String getDeadLetterQueueSuffix()
+    {
+        return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Mon Nov 28 09:19:15 2011
@@ -347,4 +347,18 @@ public class VirtualHostConfiguration ex
     {
         return getLongValue("transactionTimeout.idleClose", 0L);
     }
+
+    public int getMaxDeliveryCount()
+    {
+        return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount());
+    }
+
+    /**
+     * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set.
+     */
+    public boolean isDeadLetterQueueEnabled()
+    {
+        return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled());
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Mon Nov 28 09:19:15 2011
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
 public class DefaultExchangeFactory implements ExchangeFactory
 {
     private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+    public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
 
     private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
     private final VirtualHost _host;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Mon Nov 28 09:19:15 2011
@@ -117,7 +117,7 @@ public class FanoutExchange extends Abst
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
-        return _queues.contains(queue);
+        return _queues.containsKey(queue);
     }
 
     public boolean isBound(AMQShortString routingKey)
@@ -129,7 +129,7 @@ public class FanoutExchange extends Abst
     public boolean isBound(AMQQueue queue)
     {
 
-        return _queues.contains(queue);
+        return _queues.containsKey(queue);
     }
 
     public boolean hasBindings()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Nov 28 09:19:15 2011
@@ -127,8 +127,6 @@ public class BasicGetMethodHandler imple
         final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
         {
 
-            int _msg;
-
             public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
             throws AMQException
             {
@@ -137,6 +135,7 @@ public class BasicGetMethodHandler imple
                 {
                     session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
                                                                             deliveryTag, queue.getMessageCount());
+                    entry.incrementDeliveryCount();
                 }
                 else
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Mon Nov 28 09:19:15 2011
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler im
         {
             _logger.debug("Rejecting:" + body.getDeliveryTag() +
                           ": Requeue:" + body.getRequeue() +
-                          //": Resend:" + evt.getMethod().resend +
                           " on channel:" + channel.debugIdentity());
         }
 
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler im
         if (message == null)
         {
             _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-//            throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
         }                 
         else
         {
             if (message.isQueueDeleted())
             {
-                _logger.warn("Message's Queue as already been purged, unable to Reject. " +
-                             "Dropping message should use Dead Letter Queue");
+                _logger.warn("Message's Queue has already been purged, dropping message");
                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 if(message != null)
                 {
                     message.discard();
                 }
-                //sendtoDeadLetterQueue(msg)
                 return;
             }
 
             if (message.getMessage() == null)
             {
-                _logger.warn("Message as already been purged, unable to Reject.");
+                _logger.warn("Message has already been purged, unable to Reject.");
                 return;
             }
 
@@ -98,27 +94,44 @@ public class BasicRejectMethodHandler im
             {
                 _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
                               ": Requeue:" + body.getRequeue() +
-                              //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());
             }
 
-            // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
-            //if (!evt.getMethod().resend)
-            {
-                message.reject();
-            }
+            message.reject();
 
             if (body.getRequeue())
             {
                 channel.requeue(deliveryTag);
+
+                //this requeue represents a message rejected from the pre-dispatch queue
+                //therefore we need to amend the delivery counter.
+                message.decrementDeliveryCount();
             }
             else
             {
-                _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
-                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                //sendtoDeadLetterQueue(AMQMessage message)
-//                message.queue = channel.getDefaultDeadLetterQueue();
-//                channel.requeue(deliveryTag);
+                 final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+                 _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+                 if (maxDeliveryCountEnabled)
+                 {
+                     final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+                     _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+                     if (deliveredTooManyTimes)
+                     {
+                         channel.deadLetter(body.getDeliveryTag());
+                     }
+                     else
+                     {
+                         //this requeue represents a message rejected because of a recover/rollback that we
+                         //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+                         //and therefore need to increment the delivery counter so we cancel out the effect
+                         //of the AMQChannel#resend() decrement.
+                         message.incrementDeliveryCount();
+                     }
+                 }
+                 else
+                 {
+                     channel.deadLetter(body.getDeliveryTag());
+                 }
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Mon Nov 28 09:19:15 2011
@@ -72,7 +72,12 @@ public class TxRollbackHandler implement
             };
 
             channel.rollback(task);
-            
+
+            //Now resend all the unacknowledged messages back to the original subscribers.
+            //(Must be done after the TxnRollback-ok response).
+            // Why, are we not allowed to send messages back to client before the ok method?
+            channel.resend(false);
+
         }
         catch (AMQException e)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties Mon Nov 28 09:19:15 2011
@@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control R
 # 0 - time in milliseconds
 OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
 IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
+
+DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
+DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
+DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Mon Nov 28 09:19:15 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
@@ -65,7 +66,6 @@ public class AMQMessage extends Abstract
 
     WeakReference<AMQChannel> _channelRef;
 
-
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
         this(handle, null);
@@ -122,7 +122,15 @@ public class AMQMessage extends Abstract
 
     public String getRoutingKey()
     {
-        // TODO
+        MessageMetaData messageMetaData = getMessageMetaData();
+        if (messageMetaData != null)
+        {
+            AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+            if (routingKey != null)
+            {
+                return routingKey.asString();
+            }
+        }
         return null;
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Nov 28 09:19:15 2011
@@ -213,6 +213,8 @@ public interface AMQQueue extends Managa
 
     void setAlternateExchange(Exchange exchange);
 
+    void setAlternateExchange(String exchangeName);
+
     Map<String, Object> getArguments();
 
     void checkCapacity(AMQChannel channel);
@@ -272,4 +274,22 @@ public interface AMQQueue extends Managa
     ManagedObject getManagedObject();
 
     void setExclusive(boolean exclusive) throws AMQException;
+
+    /**
+     * Gets the maximum delivery count.   If a message on this queue
+     * is delivered more than maximumDeliveryCount, the message will be
+     * routed to the {@link #getAlternateExchange()} (if set), or otherwise
+     * discarded. 0 indicates that maximum deliver count should not be enforced.
+     *
+     * @return maximum delivery count
+     */
+    int getMaximumDeliveryCount();
+
+    /**
+     * Sets the maximum delivery count.
+     *
+     * @param maximumDeliveryCount maximum delivery count
+     */
+    public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Mon Nov 28 09:19:15 2011
@@ -24,9 +24,15 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class AMQQueueFactory
@@ -37,6 +43,11 @@ public class AMQQueueFactory
     public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
     public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
 
+    public static final String DLQ_ROUTING_KEY = "dlq";
+    public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+    public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+    public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+
     private abstract static class QueueProperty
     {
 
@@ -80,6 +91,24 @@ public class AMQQueueFactory
 
     }
 
+    private abstract static class QueueIntegerProperty extends QueueProperty
+    {
+        public QueueIntegerProperty(String argumentName)
+        {
+            super(argumentName);
+        }
+
+        public void setPropertyValue(AMQQueue queue, Object value)
+        {
+            if(value instanceof Number)
+            {
+                setPropertyValue(queue, ((Number)value).intValue());
+            }
+
+        }
+        abstract void setPropertyValue(AMQQueue queue, int value);
+    }
+
     private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
             new QueueLongProperty("x-qpid-maximum-message-age")
             {
@@ -122,8 +151,14 @@ public class AMQQueueFactory
                 {
                     queue.setFlowResumeCapacity(value);
                 }
+            },
+            new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+            {
+                public void setPropertyValue(AMQQueue queue, int value)
+                {
+                    queue.setMaximumDeliveryCount(value);
+                }
             }
-
     };
 
 
@@ -149,8 +184,13 @@ public class AMQQueueFactory
                                               String owner,
                                               boolean autoDelete,
                                               boolean exclusive,
-                                              VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException
+                                              VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
     {
+        if (queueName == null)
+        {
+            throw new IllegalArgumentException("Queue name must not be null");
+        }
+
         // Access check
         if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
         {
@@ -158,6 +198,13 @@ public class AMQQueueFactory
             throw new AMQSecurityException(description);
         }
 
+        QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
+        boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
+        if (isDLQEnabled)
+        {
+            validateDLNames(queueName);
+        }
+
         int priorities = 1;
         String conflationKey = null;
         String sortingKey = null;
@@ -219,10 +266,63 @@ public class AMQQueueFactory
             }
         }
 
-        return q;
+        if(isDLQEnabled)
+        {
+            final String dlExchangeName = getDeadLetterExchangeName(queueName);
+            final String dlQueueName = getDeadLetterQueueName(queueName);
 
-    }
+            final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+            final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+            final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+            Exchange dlExchange = null;
+            synchronized(exchangeRegistry)
+            {
+                dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+                if(dlExchange == null)
+                {
+                    dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+                    exchangeRegistry.registerExchange(dlExchange);
+
+                    //enter the dle in the persistent store
+                    virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+                }
+            }
+
+            AMQQueue dlQueue = null;
+
+            synchronized(queueRegistry)
+            {
+                dlQueue = queueRegistry.getQueue(dlQueueName);
+
+                if(dlQueue == null)
+                {
+                    //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
+                    final Map<String, Object> args = new HashMap<String, Object>();
+                    args.put(X_QPID_DLQ_ENABLED, false);
+                    args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
+
+                    dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+
+                    //enter the dlq in the persistent store
+                    virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+                }
+            }
+
+            //ensure the queue is bound to the exchange
+            if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+            {
+                //actual routing key used does not matter due to use of fanout exchange,
+                //but we will make the key 'dlq' as it can be logged at creation.
+                virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+            }
+            q.setAlternateExchange(dlExchange);
+        }
 
+        return q;
+    }
 
     public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
     {
@@ -250,10 +350,108 @@ public class AMQQueueFactory
             arguments = new HashMap<String,Object>();
             arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
         }
+        if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
+        {
+            if (arguments == null)
+            {
+                arguments = new HashMap<String,Object>();
+            }
+            arguments.put(X_QPID_DLQ_ENABLED, true);
+        }
 
         AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
         q.configure(config);
         return q;
     }
 
+
+    /**
+     * Validates DLQ and DLE names
+     * <p>
+     * DLQ name and DLQ exchange name need to be validated in order to keep
+     * integrity in cases when queue name passes validation check but DLQ name
+     * or DL exchange name fails to pass it. Otherwise, we might have situations
+     * when queue is created but DL exchange or/and DLQ creation fail.
+     * <p>
+     *
+     * @param name
+     *            queue name
+     * @throws IllegalArgumentException
+     *             thrown if length of queue name or exchange name exceed 255
+     */
+    protected static void validateDLNames(String name)
+    {
+        // check if DLQ name and DLQ exchange name do not exceed 255
+        String exchangeName = getDeadLetterExchangeName(name);
+        if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+        {
+            throw new IllegalArgumentException("DL exchange name '" + exchangeName
+                    + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+        }
+        String queueName = getDeadLetterQueueName(name);
+        if (queueName.length() > AMQShortString.MAX_LENGTH)
+        {
+            throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+                    + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+        }
+    }
+
+    /**
+     * Checks if DLQ is enabled for the queue.
+     *
+     * @param autoDelete
+     *            queue auto-delete flag
+     * @param arguments
+     *            queue arguments
+     * @param qConfig
+     *            queue configuration
+     * @return true if DLQ enabled
+     */
+    protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+    {
+        //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+        if (!autoDelete)
+        {
+            boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+            if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
+            {
+                boolean dlqEnabled = true;
+                if (dlqArgumentPresent)
+                {
+                    Object argument = arguments.get(X_QPID_DLQ_ENABLED);
+                    dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+                }
+                return dlqEnabled;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Generates a dead letter queue name for a given queue name
+     *
+     * @param name
+     *            queue name
+     * @return DLQ name
+     */
+    protected static String getDeadLetterQueueName(String name)
+    {
+        ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+        String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
+        return dlQueueName;
+    }
+
+    /**
+     * Generates a dead letter exchange name for a given queue name
+     *
+     * @param name
+     *            queue name
+     * @return DL exchange name
+     */
+    protected static String getDeadLetterExchangeName(String name)
+    {
+        ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+        String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
+        return dlExchangeName;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Nov 28 09:19:15 2011
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
@@ -80,7 +81,7 @@ public class AMQQueueMBean extends AMQMa
     private final String _queueName;
     // OpenMBean data types for viewMessages method
 
-    private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
+    private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types.
     private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
     private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
 
@@ -139,6 +140,7 @@ public class AMQQueueMBean extends AMQMa
         _msgAttributeTypes[2] = SimpleType.LONG; // For size
         _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
         _msgAttributeTypes[4] = SimpleType.LONG; // For queue position
+        _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count
 
         _messageDataType = new CompositeType("Message", "AMQ Message", 
                 VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]),
@@ -177,6 +179,11 @@ public class AMQQueueMBean extends AMQMa
         return _queue.getMessageCount();
     }
 
+    public Integer getMaximumDeliveryCount()
+    {
+        return _queue.getMaximumDeliveryCount();
+    }
+
     public Long getMaximumMessageSize()
     {
         return _queue.getMaximumMessageSize();
@@ -295,6 +302,18 @@ public class AMQQueueMBean extends AMQMa
         }
     }
 
+    public void setAlternateExchange(String exchangeName)
+    {
+        _queue.setAlternateExchange(exchangeName);
+    }
+
+    public String getAlternateExchange()
+    {
+        Exchange exchange = _queue.getAlternateExchange();
+        String name = exchange == null ? null : exchange.getName();
+        return name == null ? null : name;
+    }
+
     /**
      * Checks if there is any notification to be send to the listeners
      */
@@ -472,7 +491,7 @@ public class AMQQueueMBean extends AMQMa
                     ContentHeaderBody headerBody = msg.getContentHeaderBody();
                     // Create header attributes list
                     headerAttributes = getMessageHeaderProperties(headerBody);
-                    itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+                    itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 else if(serverMsg instanceof MessageTransferMessage)
                 {
@@ -481,13 +500,13 @@ public class AMQQueueMBean extends AMQMa
 
                     // Create header attributes list
                     headerAttributes = getMessageTransferMessageHeaderProps(msg);
-                    itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
+                    itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 else
                 {
                     //unknown message
                     headerAttributes = new String[]{"N/A"};
-                    itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
+                    itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 
                 CompositeData messageData = new CompositeDataSupport(_messageDataType, 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Mon Nov 28 09:19:15 2011
@@ -24,7 +24,7 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
 
-class InboundMessageAdapter implements InboundMessage
+public class InboundMessageAdapter implements InboundMessage
 {
 
     private QueueEntry _entry;
@@ -33,7 +33,7 @@ class InboundMessageAdapter implements I
     {
     }
 
-    InboundMessageAdapter(QueueEntry entry)
+    public InboundMessageAdapter(QueueEntry entry)
     {
         _entry = entry;
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Nov 28 09:19:15 2011
@@ -1,5 +1,7 @@
 package org.apache.qpid.server.queue;
 
+import java.util.Collection;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.message.ServerMessage;
@@ -234,4 +236,16 @@ public interface QueueEntry extends Comp
      * @return true if entry is either DEQUED or DELETED state
      */
     boolean isDispensed();
+
+    /**
+     * Number of times this queue entry has been delivered.
+     *
+     * @return delivery count
+     */
+    int getDeliveryCount();
+
+    void incrementDeliveryCount();
+
+    void decrementDeliveryCount();
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Nov 28 09:19:15 2011
@@ -31,10 +31,13 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -80,6 +83,12 @@ public abstract class QueueEntryImpl imp
 
     private volatile int _deliveryState;
 
+    /** Number of times this message has been delivered */
+    private volatile int _deliveryCount = 0;
+    private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
+                    .newUpdater(QueueEntryImpl.class, "_deliveryCount");
+
+
 
     public QueueEntryImpl(QueueEntryList<?> queueEntryList)
     {
@@ -406,50 +415,51 @@ public abstract class QueueEntryImpl imp
     public void routeToAlternate()
     {
         final AMQQueue currentQueue = getQueue();
-            Exchange alternateExchange = currentQueue.getAlternateExchange();
+        Exchange alternateExchange = currentQueue.getAlternateExchange();
 
-            if(alternateExchange != null)
+        if (alternateExchange != null)
+        {
+            final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+            final ServerMessage message = getMessage();
+            if (rerouteQueues != null && rerouteQueues.size() != 0)
             {
-                final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
-                final ServerMessage message = getMessage();
-                if(rerouteQueues != null && rerouteQueues.size() != 0)
-                {
-                    ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
 
-                    txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
-                        public void postCommit()
+                ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+                txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+                {
+                    public void postCommit()
+                    {
+                        try
                         {
-                            try
+                            for (BaseQueue queue : rerouteQueues)
                             {
-                                for(BaseQueue queue : rerouteQueues)
-                                {
-                                    queue.enqueue(message);
-                                }
-                            }
-                            catch (AMQException e)
-                            {
-                                throw new RuntimeException(e);
+                                queue.enqueue(message);
                             }
                         }
-
-                        public void onRollback()
+                        catch (AMQException e)
                         {
-
+                            throw new RuntimeException(e);
                         }
-                    });
-                    txn.dequeue(currentQueue,message,
-                                new ServerTransaction.Action()
-                                {
-                                    public void postCommit()
-                                    {
-                                        discard();
-                                    }
+                    }
+
+                    public void onRollback()
+                    {
 
-                                    public void onRollback()
-                                    {
+                    }
+                });
+                txn.dequeue(currentQueue, message, new ServerTransaction.Action()
+                {
+                    public void postCommit()
+                    {
+                        discard();
+                    }
+
+                    public void onRollback()
+                    {
 
-                                    }
-                                });
+                    }
+                });
                 }
             }
     }
@@ -524,4 +534,19 @@ public abstract class QueueEntryImpl imp
         return _state.isDispensed();
     }
 
+    public int getDeliveryCount()
+    {
+        return _deliveryCount;
+    }
+
+    public void incrementDeliveryCount()
+    {
+        _deliveryCountUpdater.incrementAndGet(this);
+    }
+
+    public void decrementDeliveryCount()
+    {
+        _deliveryCountUpdater.decrementAndGet(this);
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Nov 28 09:19:15 2011
@@ -187,7 +187,8 @@ public class SimpleAMQQueue implements A
     private long _createTime = System.currentTimeMillis();
     private ConfigurationPlugin _queueConfiguration;
 
-
+    /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
+    private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
     {
@@ -356,6 +357,22 @@ public class SimpleAMQQueue implements A
         _alternateExchange = exchange;
     }
 
+    public void setAlternateExchange(String exchangeName)
+    {
+        if(exchangeName == null || exchangeName.equals(""))
+        {
+            _alternateExchange = null;
+            return;
+        }
+
+        Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
+        if (exchange == null)
+        {
+            throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
+        }
+        setAlternateExchange(exchange);
+    }
+
     public Map<String, Object> getArguments()
     {
         return _arguments;
@@ -521,13 +538,12 @@ public class SimpleAMQQueue implements A
         //Reconfigure the queue for to reflect this new binding.
         ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
 
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
-        }
-
         if (config != null)
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
+            }
             // Reconfigure with new config.
             configure(config);
         }
@@ -2108,6 +2124,7 @@ public class SimpleAMQQueue implements A
                 setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
                 setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
                 setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
+                setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount());
                 _capacity = ((QueueConfiguration)config).getCapacity();
                 _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
             }
@@ -2229,4 +2246,15 @@ public class SimpleAMQQueue implements A
     {
         return _logActor;
     }
+
+    public int getMaximumDeliveryCount()
+    {
+        return _maximumDeliveryCount;
+    }
+
+    public void setMaximumDeliveryCount(final int maximumDeliveryCount)
+    {
+        _maximumDeliveryCount = maximumDeliveryCount;
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java Mon Nov 28 09:19:15 2011
@@ -20,18 +20,31 @@
  */
 package org.apache.qpid.server;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
+public class AMQBrokerManagerMBeanTest extends QpidTestCase
 {
     private QueueRegistry _queueRegistry;
     private ExchangeRegistry _exchangeRegistry;
@@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest e
         assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName)));
     }
 
+    /**
+     * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the
+     * maximum delivery count to be set on the Queue.
+     */
+    public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception
+    {
+        final Map<String,Object> args = new HashMap<String, Object>();
+        args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+        final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount");
+
+        final QueueRegistry qReg = _vHost.getQueueRegistry();
+
+        assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+        final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+        mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+        final AMQQueue createdQueue = qReg.getQueue(queueName);
+        assertNotNull("The queue was not registered as expected", createdQueue);
+        assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount());
+    }
+
+    /**
+     * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of
+     * a Priority Queue.
+     */
+    public void testCreatePriorityQueue() throws Exception
+    {
+        int numPriorities = 7;
+        Map<String,Object> args = new HashMap<String, Object>();
+        args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities);
+
+        AMQShortString queueName = new AMQShortString("testCreatePriorityQueue");
+
+        QueueRegistry qReg = _vHost.getQueueRegistry();
+
+        assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+        mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+        AMQQueue queue = qReg.getQueue(queueName);
+        assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+        assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities());
+    }
+
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
+
+        CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+        XMLConfiguration configXml = new XMLConfiguration();
+        configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
+        configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
+
+        ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+        ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+        ApplicationRegistry.initialise(registry);
+        registry.getVirtualHostRegistry().setDefaultVirtualHostName("test");
+
         IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
         _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
         _queueRegistry = _vHost.getQueueRegistry();
         _exchangeRegistry = _vHost.getExchangeRegistry();
     }
 
+    @Override
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            ApplicationRegistry.remove();
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java Mon Nov 28 09:19:15 2011
@@ -24,6 +24,8 @@ import junit.framework.TestCase;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
 
 public class QueueConfigurationTest extends TestCase
 {
@@ -43,11 +45,71 @@ public class QueueConfigurationTest exte
         fullEnv.setProperty("queues.maximumMessageSize", 1);
         fullEnv.setProperty("queues.maximumMessageCount", 1);
         fullEnv.setProperty("queues.minimumAlertRepeatGap", 1);
+        fullEnv.setProperty("queues.deadLetterQueues", true);
+        fullEnv.setProperty("queues.maximumDeliveryCount", 5);
 
         _fullHostConf = new VirtualHostConfiguration("test", fullEnv);
 
     }
 
+    public void testMaxDeliveryCount() throws Exception
+    {
+        try
+        {
+            ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+            ApplicationRegistry.initialise(registry);
+
+            // Check default value
+            QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+            assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount());
+
+            // Check explicit value
+            VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7);
+            qConf = new QueueConfiguration("test", vhostConfig);
+            assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount());
+
+            // Check inherited value
+            qConf = new QueueConfiguration("test",  _fullHostConf);
+            assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount());
+
+        }
+        finally
+        {
+            ApplicationRegistry.remove();
+        }
+    }
+
+    /**
+     * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden
+     * at a broker or virtualhost level.
+     * @throws Exception
+     */
+    public void testIsDeadLetterQueueEnabled() throws Exception
+    {
+        try
+        {
+            ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+            ApplicationRegistry.initialise(registry);
+
+            // Check default value
+            QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+            assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+            // Check explicit value
+            VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true);
+            qConf = new QueueConfiguration("test", vhostConfig);
+            assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+            // Check inherited value
+            qConf = new QueueConfiguration("test", _fullHostConf);
+            assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+        }
+        finally
+        {
+            ApplicationRegistry.remove();
+        }
+    }
+
     public void testGetMaximumMessageAge() throws ConfigurationException
     {
         // Check default value

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Mon Nov 28 09:19:15 2011
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.Writer;
 import java.util.Locale;
 
 import org.apache.commons.configuration.ConfigurationException;
@@ -1459,4 +1460,117 @@ public class ServerConfigurationTest ext
                     ce.getMessage());
         }
     }
+
+    public void testMaxDeliveryCountDefault() throws Exception
+    {
+        final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+        assertEquals(0, serverConfig.getMaxDeliveryCount());
+    }
+
+    public void testMaxDeliveryCount() throws Exception
+    {
+        _config.setProperty("maximumDeliveryCount", 5);
+        final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+        assertEquals(5, serverConfig.getMaxDeliveryCount());
+    }
+
+    /**
+     * Test XML configuration file correctly enables dead letter queues
+     */
+    public void testDeadLetterQueueConfigurationFile() throws Exception
+    {
+        // Write config
+        File xml = File.createTempFile(getClass().getName(), "xml");
+        xml.deleteOnExit();
+        FileWriter config = new FileWriter(xml);
+        config.write("<broker>\n");
+        writeSecurity(config);
+        config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+        config.write("<virtualhosts>\n");
+        config.write("<virtualhost>\n");
+        config.write("<name>test</name>\n");
+        config.write("<test>\n");
+        config.write("<queues>\n");
+        config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+        config.write("<queue>\n");
+        config.write("<name>biggles</name>\n");
+        config.write("<biggles>\n");
+        config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+        config.write("</biggles>\n");
+        config.write("</queue>\n");
+        config.write("<queue>\n");
+        config.write("<name>beetle</name>\n");
+        config.write("<beetle />\n");
+        config.write("</queue>\n");
+        config.write("</queues>\n");
+        config.write("</test>\n");
+        config.write("</virtualhost>\n");
+        config.write("<virtualhost>\n");
+        config.write("<name>extra</name>\n");
+        config.write("<extra>\n");
+        config.write("<queues>\n");
+        config.write("<queue>\n");
+        config.write("<name>r2d2</name>\n");
+        config.write("<r2d2>\n");
+        config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+        config.write("</r2d2>\n");
+        config.write("</queue>\n");
+        config.write("<queue>\n");
+        config.write("<name>c3p0</name>\n");
+        config.write("<c3p0 />\n");
+        config.write("</queue>\n");
+        config.write("</queues>\n");
+        config.write("</extra>\n");
+        config.write("</virtualhost>\n");
+        config.write("</virtualhosts>\n");
+        config.write("</broker>\n");
+        config.close();
+
+        // Load config
+        ApplicationRegistry.remove();
+        ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml);
+        ApplicationRegistry.initialise(registry);
+        ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration();
+
+        VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test");
+        assertNotNull("Host 'test' is not found", test);
+        VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra");
+        assertNotNull("Host 'extra' is not found", test);
+
+        QueueConfiguration biggles = test.getQueueConfiguration("biggles");
+        QueueConfiguration beetle = test.getQueueConfiguration("beetle");
+        QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2");
+        QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0");
+
+        // Validate config
+        assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled());
+        assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled());
+        assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled());
+        assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled());
+        assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled());
+        assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled());
+        assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled());
+    }
+
+    /**
+     * Convenience method to output required security preamble for broker config
+     */
+    private void writeSecurity(Writer out) throws Exception
+    {
+        out.write("\t<management><enabled>false</enabled></management>\n");
+        out.write("\t<security>\n");
+        out.write("\t\t<pd-auth-manager>\n");
+        out.write("\t\t\t<principal-database>\n");
+        out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n");
+        out.write("\t\t\t\t<attributes>\n");
+        out.write("\t\t\t\t\t<attribute>\n");
+        out.write("\t\t\t\t\t\t<name>passwordFile</name>\n");
+        out.write("\t\t\t\t\t\t<value>/dev/null</value>\n");
+        out.write("\t\t\t\t\t</attribute>\n");
+        out.write("\t\t\t\t</attributes>\n");
+        out.write("\t\t\t</principal-database>\n");
+        out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n");
+        out.write("\t\t</pd-auth-manager>\n");
+        out.write("\t</security>\n");
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Mon Nov 28 09:19:15 2011
@@ -21,7 +21,6 @@ package org.apache.qpid.server.configura
 
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -34,19 +33,6 @@ public class VirtualHostConfigurationTes
 {
 
     @Override
-    public void setUp() throws Exception
-    {
-        super.setUp();
-        // Set the default configuration items
-        getConfigXml().clear();
-        getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test");
-        getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
-
-        getConfigXml().addProperty("virtualhosts.virtualhost.name", getName());
-        getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
-    }
-
-    @Override
     public void createBroker()
     {
         // Prevent auto broker startup
@@ -134,6 +120,88 @@ public class VirtualHostConfigurationTes
         assertEquals(3, bTest.getMaximumMessageAge());
     }
 
+    public void testMaxDeliveryCount() throws Exception
+    {
+        // Set up vhosts and queues
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5);
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4);
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+        // Start the broker now.
+        super.createBroker();
+
+        // Get vhosts
+        VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+
+        // Enabled specifically
+        assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount());
+
+        // Enabled by test vhost default
+        assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount());
+
+        // Disabled specifically
+        assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount());
+    }
+
+    /**
+     * Tests the full set of configuration options for enabling DLQs in the broker configuration.
+     */
+    public void testIsDeadLetterQueueEnabled() throws Exception
+    {
+        // Set up vhosts and queues
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+
+        getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
+        getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
+
+        // Start the broker now.
+        super.createBroker();
+
+        // Get vhosts
+        VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+        VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra");
+
+        // Enabled specifically
+        assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled());
+        assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled());
+
+        // Enabled by test vhost default
+        assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled());
+
+        // Disabled specifically
+        assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled());
+
+        // Using broker default of disabled
+        assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled());
+        assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
+
+        // Get queues
+        AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
+        AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
+        AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
+        AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+
+        // Disabled specifically for this queue, overriding virtualhost setting
+        assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
+
+        // Enabled for all queues on the virtualhost
+        assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange());
+
+        // Enabled specifically for this queue, overriding the default broker setting of disabled
+        assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange());
+
+        // Disabled by the default broker setting
+        assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange());
+    }
+
     /**
      * Test that the house keeping pool sizes is correctly processed
      *
@@ -173,7 +241,7 @@ public class VirtualHostConfigurationTes
                      vhost.getHouseKeepingTaskCount());
 
         // Currently the two are tasks:
-        // ExpiredMessageTask from VirtualHost        
+        // ExpiredMessageTask from VirtualHost
         // UpdateTask from the QMF ManagementExchange
     }
 
@@ -214,7 +282,7 @@ public class VirtualHostConfigurationTes
      {
          getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name",
                  "testdb");
-         
+
          try
          {
              super.createBroker();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1207029&r1=1207028&r2=1207029&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Nov 28 09:19:15 2011
@@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTest
                 {
                     return null;
                 }
+
+                @Override
+                public int getDeliveryCount()
+                {
+                    return 0;
+                }
+
+                @Override
+                public void incrementDeliveryCount()
+                {
+                }
+
+                @Override
+                public void decrementDeliveryCount()
+                {
+                }
             };
 
             if(action != null)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message