qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [1/2] qpid-broker-j git commit: QPID-7815: Add support for reject policy
Date Mon, 10 Jul 2017 13:15:04 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 8468aa00b -> c5e340f0e


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index dd562b9..0da0063 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -72,12 +72,12 @@ import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -381,6 +381,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         {
             MessagePublishInfo info = _currentMessage.getMessagePublishInfo();
             String routingKey = AMQShortString.toString(info.getRoutingKey());
+            String exchangeName = AMQShortString.toString(info.getExchange());
 
             try
             {
@@ -395,7 +396,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 {
                     _confirmedMessageCounter++;
                 }
-                Runnable finallyAction = null;
 
                 long bodySize = _currentMessage.getSize();
                 long timestamp = contentHeader.getProperties().getTimestamp();
@@ -424,8 +424,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
 
                     final AMQMessage amqMessage = createAMQMessage(storedMessage);
-                    MessageReference reference = amqMessage.newReference();
-                    try
+                    try (MessageReference reference = amqMessage.newReference())
                     {
 
                         _currentMessage = null;
@@ -464,7 +463,58 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                         int enqueues = result.send(_transaction, immediate ? _immediateAction
: null);
                         if (enqueues == 0)
                         {
-                            finallyAction = handleUnroutableMessage(amqMessage);
+                            boolean mandatory = amqMessage.isMandatory();
+
+                            boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
+                            if (_logger.isDebugEnabled())
+                            {
+                                _logger.debug("Unroutable message exchange='{}', routing
key='{}', mandatory={},"
+                                        + " transactionalSession={}, closeOnNoRoute={}, confirmOnPublish={}",
+                                        exchangeName,
+                                        routingKey,
+                                        mandatory,
+                                        isTransactional(),
+                                        closeOnNoRoute,
+                                        _confirmOnPublish);
+                            }
+
+                            int errorCode = ErrorCodes.NO_ROUTE;
+                            String errorMessage = String.format("No route for message with
exchange '%s' and routing key '%s'",
+                                                                exchangeName,
+                                                                routingKey);
+                            if (result.containsReject(RejectType.LIMIT_EXCEEDED))
+                            {
+                                errorCode = ErrorCodes.RESOURCE_ERROR;
+                                errorMessage = errorMessage + ":" + result.getRejectReason();
+                            }
+
+                            if (mandatory
+                                && isTransactional()
+                                && !_confirmOnPublish
+                                && _connection.isCloseWhenNoRoute())
+                            {
+                                _connection.sendConnectionClose(errorCode, errorMessage,
_channelId);
+                            }
+                            else
+                            {
+                                if (mandatory || amqMessage.isImmediate())
+                                {
+                                    if (_confirmOnPublish)
+                                    {
+                                        _connection.writeFrame(new AMQFrame(_channelId,
+                                                                            new BasicNackBody(_confirmedMessageCounter,
+                                                                                        
     false,
+                                                                                        
     false)));
+                                    }
+                                    _transaction.addPostTransactionAction(new WriteReturnAction(errorCode,
+                                                                                        
       errorMessage,
+                                                                                        
       amqMessage));
+                                }
+                                else
+                                {
+                                    message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey));
+                                }
+                            }
                         }
                         else
                         {
@@ -476,17 +526,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                             }
                             incrementOutstandingTxnsIfNecessary();
                         }
-
-                    }
-                    finally
-                    {
-                        reference.release();
-                        if (finallyAction != null)
-                        {
-                            finallyAction.run();
-                        }
                     }
-
                 }
                 finally
                 {
@@ -503,70 +543,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    /**
-     * Either throws a {@link AMQConnectionException} or returns the message
-     *
-     * Pre-requisite: the current message is judged to have no destination queues.
-     *
-     * @throws AMQConnectionException if the message is mandatory close-on-no-route
-     * @see AMQPConnection_0_8Impl#isCloseWhenNoRoute()
-     */
-    private Runnable handleUnroutableMessage(AMQMessage message)
-    {
-        boolean mandatory = message.isMandatory();
-
-        String exchangeName = AMQShortString.toString(message.getMessagePublishInfo().getExchange());
-        String routingKey = AMQShortString.toString(message.getMessagePublishInfo().getRoutingKey());
-
-        final String description = String.format(
-                "[Exchange: %s, Routing key: %s]",
-                exchangeName,
-                routingKey);
-
-        boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
-        Runnable returnVal = null;
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug(String.format(
-                    "Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s",
-                    description, mandatory, isTransactional(), closeOnNoRoute));
-        }
-
-        if (mandatory && isTransactional() && !_confirmOnPublish &&
_connection.isCloseWhenNoRoute())
-        {
-            returnVal = new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-                                _connection.sendConnectionClose(ErrorCodes.NO_ROUTE,
-                                        "No route for message " + description, _channelId);
-
-                            }
-                        };
-        }
-        else
-        {
-            if (mandatory || message.isImmediate())
-            {
-                if(_confirmOnPublish)
-                {
-                    _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter,
false, false)));
-                }
-                _transaction.addPostTransactionAction(new WriteReturnAction(ErrorCodes.NO_ROUTE,
-                                                                            "No Route for
message "
-                                                                            + description,
-                                                                            message));
-            }
-            else
-            {
-
-                message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey));
-            }
-        }
-        return returnVal;
-    }
-
     private void publishContentBody(ContentBody contentBody)
     {
         if (_logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 575dafd..c26dbab 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
@@ -123,23 +124,47 @@ public class ExchangeDestination extends QueueDestination
                     return null;
                 }};
 
-        final RoutingResult result = _exchange.route(message,
-                                                     routingAddress,
-                                                     instanceProperties);
-        int enqueues = result.send(txn, null);
-
-        if(enqueues == 0)
+        final RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
+        final int enqueues = result.send(txn, null);
+        if (enqueues == 0)
         {
-            _exchange.getEventLogger().message(ExchangeMessages.DISCARDMSG(_exchange.getName(),
routingAddress));
+            if (!_discardUnroutable)
+            {
+                if (result.isRejected())
+                {
+                    AmqpError error;
+                    if (result.containsReject(RejectType.LIMIT_EXCEEDED))
+                    {
+                        error = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+                    }
+                    else if (result.containsReject(RejectType.PRECONDITION_FAILED))
+                    {
+                        error = AmqpError.PRECONDITION_FAILED;
+                    }
+                    else
+                    {
+                        error = AmqpError.ILLEGAL_STATE;
+                    }
+                    return createdRejectedOutcome(error, result.getRejectReason());
+                }
+                else
+                {
+                    return createdRejectedOutcome(AmqpError.NOT_FOUND,
+                                                  String.format("Unknown destination '%s'",
routingAddress));
+                }
+            }
+            else
+            {
+                _exchange.getEventLogger().message(ExchangeMessages.DISCARDMSG(_exchange.getName(),
routingAddress));
+            }
         }
-
-        return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(routingAddress)
: ACCEPTED;
+        return ACCEPTED;
     }
 
-    private Outcome createdRejectedOutcome(final String routingAddress)
+    private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
     {
         Rejected rejected = new Rejected();
-        final Error notFoundError = new Error(AmqpError.NOT_FOUND, "Unknown destination '"+routingAddress+'"');
+        final Error notFoundError = new Error(errorCode, errorMessage);
         rejected.setError(notFoundError);
         return rejected;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index ac598d6..1332a95 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
@@ -107,27 +108,41 @@ public class NodeReceivingDestination implements ReceivingDestination
                 }};
 
         RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
-        int enqueues = result.send(txn, null);
+        final int enqueues = result.send(txn, null);
 
-        if(enqueues == 0)
+        if (enqueues == 0)
         {
-            _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
-        }
-
-        if (enqueues == 0 && !_discardUnroutable)
-        {
-            if (result.hasNotAcceptingRoutableQueue())
+            if (!_discardUnroutable)
             {
-                return createdRejectedOutcome(AmqpError.PRECONDITION_FAILED,
-                                              result.getUnacceptanceCause());
+                if (result.isRejected())
+                {
+                    AmqpError error;
+                    if (result.containsReject(RejectType.LIMIT_EXCEEDED))
+                    {
+                        error = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+                    }
+                    else if (result.containsReject(RejectType.PRECONDITION_FAILED))
+                    {
+                        error = AmqpError.PRECONDITION_FAILED;
+                    }
+                    else
+                    {
+                        error = AmqpError.ILLEGAL_STATE;
+                    }
+                    return createdRejectedOutcome(error, result.getRejectReason());
+                }
+                else
+                {
+                    return createdRejectedOutcome(AmqpError.NOT_FOUND,
+                                                  String.format("Unknown destination '%s'",
routingAddress));
+                }
+            }
+            else
+            {
+                _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(),
routingAddress));
             }
-            return createdRejectedOutcome(AmqpError.NOT_FOUND,
-                                        "Unknown destination '" + routingAddress + '"');
-        }
-        else
-        {
-            return ACCEPTED;
         }
+        return ACCEPTED;
     }
 
     private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 095ab03..f8fe98c 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -91,6 +91,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private final AtomicLong _inMemorySize = new AtomicLong();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
     private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+    private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new
ConcurrentHashMap<>());
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -1332,6 +1333,18 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
     }
 
+    @Override
+    public void addMessageDeleteListener(final MessageDeleteListener listener)
+    {
+        _messageDeleteListeners.add(listener);
+    }
+
+    @Override
+    public void removeMessageDeleteListener(final MessageDeleteListener listener)
+    {
+        _messageDeleteListeners.remove(listener);
+    }
+
     private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>,
MessageHandle<T>
     {
 
@@ -1568,6 +1581,13 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
             _messageDataRef = null;
             _inMemorySize.addAndGet(-bytesCleared);
+            if (!_messageDeleteListeners.isEmpty())
+            {
+                for (final MessageDeleteListener messageDeleteListener : _messageDeleteListeners)
+                {
+                    messageDeleteListener.messageDeleted(this);
+                }
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
b/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
index 482be63..4e6a1d8 100644
--- a/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
@@ -20,100 +20,118 @@
 */
 package org.apache.qpid.test.client.queue;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.util.AMQExceptionTestUtil;
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class QueuePolicyTest extends QpidBrokerTestCase
 {
     private Connection _connection;
-    
+
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
-        _connection = getConnection() ;
-        _connection.start();
+        _connection = getConnection();
     }
-    
-    @Override
-    public void tearDown() throws Exception
-    {
-        _connection.close();
-        super.tearDown();
-    }
-    
-    /**
-     * Test Goal : To create a ring queue programitcally with max queue count using the
-     *             address string and observe that it works as expected.
-     */
-    public void testRejectPolicy() throws Exception
+
+    public void testRejectPolicyMessageDepth() throws Exception
     {
-        String addr = "ADDR:queue; {create: always, " +
-        "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
-        "x-declare:{ arguments : {'qpid.max_count':5} }}}";
-                
-        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
-        Destination dest = ssn.createQueue(addr);
-        MessageConsumer consumer = ssn.createConsumer(dest);
-        MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
-        
-        for (int i=0; i<6; i++)
+        Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        Destination destination = createQueue(session, OverflowPolicy.REJECT, 5);
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < 5; i++)
         {
-            prod.send(ssn.createMessage());
+            producer.send(session.createMessage());
+            session.commit();
         }
-        
+
         try
-        {   
-            prod.send(ssn.createMessage());
-            ((AMQSession)ssn).sync();
+        {
+            producer.send(session.createMessage());
+            session.commit();
             fail("The client did not receive an exception after exceeding the queue limit");
         }
-        catch (AMQException e)
+        catch (JMSException e)
         {
-            AMQExceptionTestUtil.assertAMQException("The correct error code is not set",
506, e);
+            assertTrue("Unexpected exception: " + e.getMessage(),
+                       e.getMessage().contains("Maximum depth exceeded"));
         }
+
+        Connection secondConnection = getConnection();
+        secondConnection.start();
+
+        Session secondSession = secondConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = secondSession.createConsumer(destination);
+        Message receivedMessage = consumer.receive(getReceiveTimeout());
+        assertNotNull("Message  is not received", receivedMessage);
+        secondSession.commit();
+
+        MessageProducer secondProducer = secondSession.createProducer(destination);
+        secondProducer.send(secondSession.createMessage());
+        secondSession.commit();
     }
-    
-    /**
-     * Test Goal : To create a ring queue programmatically using the address string and observe
-     *             that it works as expected.
-     */
+
     public void testRingPolicy() throws Exception
     {
-        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
-        String addr = "ADDR:my-ring-queue; {create: always, " +
-        "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
-               "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2} }}}";
-    
-        Destination dest = ssn.createQueue(addr);
-        MessageConsumer consumer = ssn.createConsumer(dest);
-        MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
-        
-        _connection.stop();
-
-        prod.send(ssn.createTextMessage("Test1"));
-        prod.send(ssn.createTextMessage("Test2"));
-        prod.send(ssn.createTextMessage("Test3"));
+        Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+        Destination destination = createQueue(session, OverflowPolicy.RING, 2);
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage("Test1"));
+        producer.send(session.createTextMessage("Test2"));
+        producer.send(session.createTextMessage("Test3"));
+
+        MessageConsumer consumer = session.createConsumer(destination);
         _connection.start();
-        
-        TextMessage msg = (TextMessage)consumer.receive(1000);
-        assertNotNull("The consumer should receive the msg with body='Test2'", msg);
-        assertEquals("Unexpected first message","Test2",msg.getText());
-        
-        msg = (TextMessage)consumer.receive(1000);
-        assertNotNull("The consumer should receive the msg with body='Test3'", msg);
-        assertEquals("Unexpected second message","Test3", msg.getText());
+
+        TextMessage receivedMessage = (TextMessage) consumer.receive(getReceiveTimeout());
+        assertNotNull("The consumer should receive the receivedMessage with body='Test2'",
receivedMessage);
+        assertEquals("Unexpected first message", "Test2", receivedMessage.getText());
+
+        receivedMessage = (TextMessage) consumer.receive(getReceiveTimeout());
+        assertNotNull("The consumer should receive the receivedMessage with body='Test3'",
receivedMessage);
+        assertEquals("Unexpected second message", "Test3", receivedMessage.getText());
+    }
+
+
+    private Destination createQueue(Session session, OverflowPolicy overflowPolicy, int msgLimit)
+            throws Exception
+    {
+        Destination destination;
+        String testQueueName = getTestQueueName();
+        if (isBroker10())
+        {
+            final Map<String, Object> arguments = new HashMap<>();
+            arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, overflowPolicy.name());
+            arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES,
msgLimit);
+            createEntityUsingAmqpManagement(testQueueName, session, "org.apache.qpid.Queue",
arguments);
+            destination = getQueueFromName(session, testQueueName);
+        }
+        else
+        {
+            String address = String.format("ADDR: %s; {create: always, node: {"
+                                           + "x-bindings: [{exchange : 'amq.direct', key
: %s}],"
+                                           + "x-declare:{arguments : {'qpid.policy_type':
%s, 'qpid.max_count': %d}}"
+                                           + "}}",
+                                           testQueueName,
+                                           testQueueName, overflowPolicy.name(), msgLimit);
+            destination = session.createQueue(address);
+            session.createConsumer(destination).close();
+        }
+        return destination;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index f8b4416..98d5d62 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -23,7 +23,6 @@ org.apache.qpid.jndi.PropertiesFileInitialContextFactoryTest#*
 // Exclude Address based tests
 org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
-org.apache.qpid.test.client.queue.QueuePolicyTest#*
 
 // Exclude tests of AMQP 0-x features
 org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#*

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/test-profiles/JavaExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaExcludes b/test-profiles/JavaExcludes
index 53f234d..c236ad5 100644
--- a/test-profiles/JavaExcludes
+++ b/test-profiles/JavaExcludes
@@ -20,8 +20,5 @@
 // QPID-1823: this takes ages to run
 org.apache.qpid.client.SessionCreateTest#*
 
-//QPID-2845: The queue reject policy type used by the C++ broker is not currently supported
by the Apache Qpid Broker for Java
-org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
-
 // Test runs for 2 minutes testing that subtraction works
 org.apache.qpid.server.util.SerialTest#testCorollary1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index a6aa35b..334d67d 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -40,8 +40,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSDesti
 
 // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
 org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
-org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
 org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties
 org.apache.qpid.server.queue.AddressBasedSortedQueueTest#*
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message