qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r888250 - in /qpid/trunk/qpid/java: broker/src/test/java/org/apache/qpid/server/queue/ systests/src/main/java/org/apache/qpid/server/queue/ systests/src/main/java/org/apache/qpid/test/utils/ test-profiles/
Date Tue, 08 Dec 2009 04:05:06 GMT
Author: robbie
Date: Tue Dec  8 04:05:04 2009
New Revision: 888250

URL: http://svn.apache.org/viewvc?rev=888250&view=rev
Log:
QPID-2177: unit and system testing for the new flow controlled related attributes of the Queue
MBean

Modified:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
    qpid/trunk/qpid/java/test-profiles/08InVMExcludes

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Tue Dec  8 04:05:04 2009
@@ -318,6 +318,67 @@
 
         }
     }
+    
+    public void testFlowControlProperties() throws Exception
+    {
+        assertTrue(_queueMBean.getCapacity() == 0);
+        assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+        assertFalse(_queueMBean.isFlowOverfull());
+        
+        //capacity currently 0, try setting FlowResumeCapacity above this
+        try
+        {
+            _queueMBean.setFlowResumeCapacity(1L);
+            fail("Should have failed to allow setting FlowResumeCapacity above Capacity");
+        }
+        catch (IllegalArgumentException ex)
+        {
+            //expected exception
+            assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+        }
+        
+        //(FlowResume)Capacity currently 0, set both to 2 then try setting Capacity below
this
+        _queueMBean.setCapacity(2L);
+        assertTrue(_queueMBean.getCapacity() == 2L);
+        _queueMBean.setFlowResumeCapacity(2L);
+        assertTrue(_queueMBean.getFlowResumeCapacity() == 2L);
+        
+        try
+        {
+            _queueMBean.setCapacity(1L);
+            fail("Should have failed to allow setting Capacity below FlowResumeCapacity");
+        }
+        catch (IllegalArgumentException ex)
+        {
+            //expected exception
+            assertTrue(_queueMBean.getCapacity() == 2);
+        }
+        
+        //set (FlowResume)Capacity to MESSAGE_SIZE +1 then add a message to the queue
+        _queueMBean.setCapacity(MESSAGE_SIZE + 1);
+        _queueMBean.setFlowResumeCapacity(MESSAGE_SIZE + 1);
+
+        AMQChannel channel = new AMQChannel(_protocolSession, 1, _messageStore);
+        sendMessages(1, true);
+        _queue.checkCapacity(channel);
+        
+        assertFalse(_queueMBean.isFlowOverfull());
+        assertFalse(channel.getBlocking());
+        
+        //add another message then check queue is now overfull and channel blocked
+        sendMessages(1, true);
+        _queue.checkCapacity(channel);
+        
+        assertTrue(_queueMBean.isFlowOverfull());
+        assertTrue(channel.getBlocking());
+        
+        //set FlowResumeCapacity to 2x MESSAGE_SIZE and check queue is now underfull and
channel unblocked
+        _queueMBean.setCapacity(2 * MESSAGE_SIZE);//must increase capacity too
+        _queueMBean.setFlowResumeCapacity(2 * MESSAGE_SIZE);
+        
+        assertFalse(_queueMBean.isFlowOverfull());
+        assertFalse(channel.getBlocking());
+    }
 
     private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
     {

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Tue Dec  8 04:05:04 2009
@@ -25,7 +25,9 @@
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.*;
@@ -43,8 +45,6 @@
 
     private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
 
-    protected final String QUEUE = "ProducerFlowControl";
-
     private static final int MSG_COUNT = 50;
 
     private Connection producerConnection;
@@ -54,12 +54,18 @@
     private Connection consumerConnection;
     private Session consumerSession;
 
-
     private MessageConsumer consumer;
     private final AtomicInteger _sentMessages = new AtomicInteger();
 
+    private JMXTestUtils _jmxUtils;
+    private boolean _jmxUtilConnected;
+    private static final String USER = "admin";
+
     public void setUp() throws Exception
     {
+        _jmxUtils = new JMXTestUtils(this, USER , USER);
+        _jmxUtils.setUp();
+        _jmxUtilConnected=false;
         super.setUp();
 
         _monitor.reset();
@@ -76,6 +82,17 @@
 
     public void tearDown() throws Exception
     {
+        if(_jmxUtilConnected)
+        {
+            try
+            {
+                _jmxUtils.close();
+            }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
+        }
         producerConnection.close();
         consumerConnection.close();
         super.tearDown();
@@ -84,11 +101,13 @@
     public void testCapacityExceededCausesBlock()
             throws JMSException, NamingException, AMQException, InterruptedException
     {
+        String queueName = getTestQueueName();
+        
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
+        queue = new AMQQueue("amq.direct",queueName);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -124,11 +143,13 @@
     public void testBrokerLogMessages()
             throws JMSException, NamingException, AMQException, InterruptedException, IOException
     {
+        String queueName = getTestQueueName();
+        
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
+        queue = new AMQQueue("amq.direct",queueName);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -161,6 +182,8 @@
     public void testClientLogMessages()
             throws JMSException, NamingException, AMQException, InterruptedException, IOException
     {
+        String queueName = getTestQueueName();
+        
         setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
         setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
 
@@ -170,8 +193,8 @@
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false,
arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false,
arguments);
+        queue = new AMQQueue("amq.direct",queueName);
         ((AMQSession) session).declareAndBind((AMQDestination)queue);
         producer = session.createProducer(queue);
 
@@ -195,11 +218,13 @@
     public void testFlowControlOnCapacityResumeEqual()
             throws JMSException, NamingException, AMQException, InterruptedException
     {
+        String queueName = getTestQueueName();
+        
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",1000);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
+        queue = new AMQQueue("amq.direct",queueName);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -229,6 +254,8 @@
     public void testFlowControlSoak()
             throws Exception, NamingException, AMQException, InterruptedException
     {
+        String queueName = getTestQueueName();
+        
         _sentMessages.set(0);
         final int numProducers = 10;
         final int numMessages = 100;
@@ -237,9 +264,9 @@
         arguments.put("x-qpid-capacity",6000);
         arguments.put("x-qpid-flow-resume-capacity",3000);
 
-        ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false,
false, arguments);
+        ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false,
false, false, arguments);
 
-        queue = new AMQQueue("amq.direct",QUEUE);
+        queue = new AMQQueue("amq.direct",queueName);
         ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
         consumerConnection.start();
 
@@ -285,6 +312,8 @@
     public void testSendTimeout()
             throws JMSException, NamingException, AMQException, InterruptedException
     {
+        String queueName = getTestQueueName();
+        
         setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
         Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -292,8 +321,8 @@
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false,
arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false,
arguments);
+        queue = new AMQQueue("amq.direct",queueName);
         ((AMQSession) session).declareAndBind((AMQDestination)queue);
         producer = session.createProducer(queue);
 
@@ -310,6 +339,76 @@
         assertNotNull("No timeout exception on sending", e);
 
     }
+    
+    
+    public void testFlowControlAttributeModificationViaJMX()
+    throws JMSException, NamingException, AMQException, InterruptedException, Exception
+    {
+        _jmxUtils.open();
+        _jmxUtilConnected = true;
+        
+        String queueName = getTestQueueName();
+        
+        //create queue
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",0);
+        arguments.put("x-qpid-flow-resume-capacity",0);
+        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
+        queue = new AMQQueue("amq.direct",queueName);
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+        
+        Thread.sleep(1000);
+        
+        //Create a JMX MBean proxy for the queue
+        ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test",
queueName));
+        assertNotNull(queueMBean);
+        
+        //check current attribute values are 0 as expected
+        assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L);
+        assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity()
== 0L);
+        
+        //set new values that will cause flow control to be active, and the queue to become
overfull after 1 message is sent
+        queueMBean.setCapacity(250L);
+        queueMBean.setFlowResumeCapacity(250L);
+        assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L);
+        assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity()
== 250L);
+        assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+        
+        // try to send 2 messages (should block after 1)
+        _sentMessages.set(0);
+        sendMessagesAsync(producer, producerSession, 2, 50L);
+
+        Thread.sleep(2000);
+
+        //check only 1 message was sent, and queue is overfull
+        assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
+        assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+        
+        //raise the attribute values, causing the queue to become underfull and allow the
second message to be sent.
+        queueMBean.setCapacity(300L);
+        queueMBean.setFlowResumeCapacity(300L);
+        
+        Thread.sleep(2000);
+
+        //check second message was sent, and caused the queue to become overfull again
+        assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get());
+        assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+        
+        //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity
still exceeded
+        queueMBean.setCapacity(700L);
+        assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+        
+        //receive a message, check queue becomes underfull
+        
+        consumer = consumerSession.createConsumer(queue);
+        consumerConnection.start();
+        
+        consumer.receive();
+        assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+        
+        consumer.receive();
+    }
 
     private MessageSender sendMessagesAsync(final MessageProducer producer,
                                             final Session producerSession,

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
Tue Dec  8 04:05:04 2009
@@ -119,6 +119,7 @@
 
         Set<ObjectName> objectNames = allObject.returnObjects();
 
+        _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size());
 
         // We have verified we have only one value in objectNames so return it
@@ -142,6 +143,7 @@
 
         Set<ObjectName> objectNames = allObject.returnObjects();
 
+        _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("Incorrect number of queues with name '" + allObject.querystring
+
                            "' returned", 1, objectNames.size());
 
@@ -167,6 +169,7 @@
 
         Set<ObjectName> objectNames = allObject.returnObjects();
 
+        _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("Incorrect number of exchange with name '" + exchange +
                            "' returned", 1, objectNames.size());
 
@@ -181,6 +184,7 @@
 
         Set<ObjectName> objectNames = allObject.returnObjects();
 
+        _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
 
         ObjectName objectName = objectNames.iterator().next();

Modified: qpid/trunk/qpid/java/test-profiles/08InVMExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/08InVMExcludes?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/08InVMExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/08InVMExcludes Tue Dec  8 04:05:04 2009
@@ -1,3 +1,6 @@
 //======================================================================
 //Exclude the following tests when running the InVM default test profile
 //======================================================================
+
+// QPID-2097 exclude until InVM JMX test runs are reliable
+org.apache.qpid.server.queue.ProducerFlowControlTest#testFlowControlAttributeModificationViaJMX



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message