qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r916691 - in /qpid/branches/0.5.x-dev/qpid/java: ./ systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Date Fri, 26 Feb 2010 14:34:53 GMT
Author: robbie
Date: Fri Feb 26 14:34:53 2010
New Revision: 916691

URL: http://svn.apache.org/viewvc?rev=916691&view=rev
Log:
QPID-2417: update the auto-ack tests to leave an unacked message on the durable subscriptions
queue between disconnect and reconnect, create consumer1 on the correct connection in SessionPerConnection
tests, add single connection NO_ACK test.

merged from trunk r915866

Modified:
    qpid/branches/0.5.x-dev/qpid/java/   (props changed)
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Propchange: qpid/branches/0.5.x-dev/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 26 14:34:53 2010
@@ -1,2 +1,2 @@
 /qpid/branches/java-broker-0-10/qpid/java:829414,829575
-/qpid/trunk/qpid/java:835115,884634-884635,884838,885765,887948,887950-887952,887994,888246,888248,888250,888345,888348,889645,891323-891332,892228,896674,896692-896693,900919,900943,902231,907851
+/qpid/trunk/qpid/java:835115,884634-884635,884838,885765,887948,887950-887952,887994,888246,888248,888250,888345,888348,889645,891323-891332,892228,896674,896692-896693,900919,900943,902231,907851,915866

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=916691&r1=916690&r2=916691&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Fri Feb 26 14:34:53 2010
@@ -115,6 +115,11 @@
         _logger.info("Close connection");
         con.close();
     }
+    
+    public void testDurabilityNOACK() throws Exception
+    {
+        durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+    }
 
     public void testDurabilityAUTOACK() throws Exception
     {
@@ -138,49 +143,78 @@
         Session session1 = con.createSession(false, ackMode);
         MessageConsumer consumer1 = session1.createConsumer(topic);
 
-        Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session sessionProd = con.createSession(false, ackMode);
         MessageProducer producer = sessionProd.createProducer(topic);
 
-        Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session2 = con.createSession(false, ackMode);
         TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
 
         con.start();
 
+        //send message A and check both consumers receive
         producer.send(session1.createTextMessage("A"));
 
         Message msg;
+        _logger.info("Receive message on consumer 1 :expecting A");
         msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
+        _logger.info("Receive message on consumer 2 :expecting A");
         msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        consumer2.close();
-        session2.close();
-
+        //send message B, receive with consumer 1, and disconnect consumer 2 to leave the
message behind (if not NO_ACK)
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
         msg = consumer1.receive(500);
         assertNotNull("Consumer 1 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+        assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(500);
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
+        consumer2.close();
+        session2.close();
+        
+        //Send message C, then connect consumer 3 to durable subscription and get
+        //message B if not using NO_ACK, then receive C with consumer 1 and 3
+        producer.send(session1.createTextMessage("C"));
+
         Session session3 = con.createSession(false, ackMode);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        _logger.info("Receive message on consumer 3 :expecting B");
+        if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+        {
+            //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+            //when we didn't call receive() to get it before closing consumer 2
+        }
+        else
+        {
+            _logger.info("Receive message on consumer 3 :expecting B");
+            msg = consumer3.receive(500);
+            assertNotNull("Consumer 3 should get message 'B'.", msg);
+            assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage)
msg).getText());
+        }
+
+        _logger.info("Receive message on consumer 1 :expecting C");
+        msg = consumer1.receive(500);
+        assertNotNull("Consumer 1 should get message 'C'.", msg);
+        assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+        _logger.info("Receive message on consumer 3 :expecting C");
         msg = consumer3.receive(500);
-        assertNotNull("Consumer 3 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        assertNotNull("Consumer 3 should get message 'C'.", msg);
+        assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
         msg = consumer3.receive(500);
         assertNull("There should be no more messages for consumption on consumer3.", msg);
@@ -211,7 +245,7 @@
         con1.start();
         Session session1 = con1.createSession(false, ackMode);
 
-        MessageConsumer consumer1 = session0.createConsumer(topic);
+        MessageConsumer consumer1 = session1.createConsumer(topic);
 
         // Create consumer 2.
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
@@ -232,37 +266,60 @@
         msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage)
msg).getText());
-        msg = consumer2.receive(500);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull("There should be no more messages for consumption on consumer2.", msg);
 
+        // Send message and receive on consumer 1.
+        producer.send(session0.createTextMessage("B"));
+
+        _logger.info("Receive message on consumer 1 :expecting B");
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertEquals("B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
+        assertEquals(null, msg);
+        
         // Detach the durable subscriber.
         consumer2.close();
         session2.close();
         con2.close();
+        
+        // Send message C and receive on consumer 1
+        producer.send(session0.createTextMessage("C"));
 
-        // Send message and receive on open consumer.
-        producer.send(session0.createTextMessage("B"));
-
-        _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
-        assertEquals("B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting C");
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertEquals("C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        // Re-attach a new consumer to the durable subscription, and check that it gets the
message that it missed.
+        // Re-attach a new consumer to the durable subscription, and check that it gets message
B it left (if not NO_ACK)
+        // and also gets message C sent after it was disconnected.
         AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
         con3.start();
         Session session3 = con3.createSession(false, ackMode);
 
         TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        _logger.info("Receive message on consumer 3 :expecting B");
-        msg = consumer3.receive(500);
-        assertNotNull("Consumer 3 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+        {
+            //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+            //when we didn't call receive() to get it before closing consumer 2
+        }
+        else
+        {
+            _logger.info("Receive message on consumer 3 :expecting B");
+            msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+            assertEquals("B", ((TextMessage) msg).getText());
+        }
+        
+        _logger.info("Receive message on consumer 3 :expecting C");
+        msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Consumer 3 should get message 'C'.", msg);
+        assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
-        msg = consumer3.receive(500);
+        msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull("There should be no more messages for consumption on consumer3.", msg);
 
         consumer1.close();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message