qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r450504 - in /incubator/qpid/trunk/qpid: cpp/broker/inc/Message.h cpp/broker/src/Channel.cpp cpp/broker/src/Message.cpp python/tests/basic.py python/tests/broker.py
Date Wed, 27 Sep 2006 16:44:02 GMT
Author: gsim
Date: Wed Sep 27 09:44:02 2006
New Revision: 450504

URL: http://svn.apache.org/viewvc?view=rev&rev=450504
Log:
Moved ack tests to basic class, added test for requeueing on recovery.
Implemented requeuing on recovery.


Modified:
    incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
    incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
    incubator/qpid/trunk/qpid/python/tests/basic.py
    incubator/qpid/trunk/qpid/python/tests/broker.py

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h?view=diff&rev=450504&r1=450503&r2=450504
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h Wed Sep 27 09:44:02 2006
@@ -39,6 +39,7 @@
             string routingKey;
             const bool mandatory;
             const bool immediate;
+            bool redelivered;
             qpid::framing::AMQHeaderBody::shared_ptr header;
             content_list content;
 
@@ -61,6 +62,7 @@
             void deliver(qpid::framing::OutputHandler* out, int channel, 
                          string& consumerTag, u_int64_t deliveryTag, 
                          u_int32_t framesize);
+            void redeliver();
 
             friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
 

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=diff&rev=450504&r1=450503&r2=450504
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Wed Sep 27 09:44:02 2006
@@ -173,7 +173,6 @@
 
 void Channel::recover(bool requeue){
     if(requeue){
-        //TODO: need to set redelivered flag
         for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
         unacknowledged.clear();
     }else{
@@ -188,6 +187,7 @@
 }
 
 void Channel::Requeue::operator()(AckRecord& record) const{
+    record.msg->redeliver();
     record.queue->deliver(record.msg);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp?view=diff&rev=450504&r1=450503&r2=450504
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp Wed Sep 27 09:44:02 2006
@@ -32,7 +32,8 @@
                                                      exchange(_exchange),
                                                      routingKey(_routingKey), 
                                                      mandatory(_mandatory),
-                                                     immediate(_immediate){
+                                                     immediate(_immediate),
+                                                     redelivered(false){
 
 }
 
@@ -51,11 +52,15 @@
     return header.get() && (header->getContentSize() == contentSize());
 }
 
+void Message::redeliver(){
+    redelivered = true;
+}
+
 void Message::deliver(OutputHandler* out, int channel, 
                       string& consumerTag, u_int64_t deliveryTag, 
                       u_int32_t framesize){
 
-    out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false,
exchange, routingKey)));
+    out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered,
exchange, routingKey)));
     AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
     out->send(new AMQFrame(channel, headerBody));
     for(content_iterator i = content.begin(); i != content.end(); i++){

Modified: incubator/qpid/trunk/qpid/python/tests/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/basic.py?view=diff&rev=450504&r1=450503&r2=450504
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/basic.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/basic.py Wed Sep 27 09:44:02 2006
@@ -137,3 +137,104 @@
         #cancellation of non-existant consumers should be handled without error
         channel.basic_cancel(consumer_tag="my-consumer")
         channel.basic_cancel(consumer_tag="this-never-existed")
+
+
+    def test_basic_ack(self):
+        """
+        Test basic ack/recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-ack-queue")
+        
+        reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+        queue = self.client.queue(reply.consumer_tag)
+
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_recover(requeue=False)
+        
+        msg3b = queue.get(timeout=1)
+        msg5b = queue.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.content.body)
+        except Empty: None
+
+    def test_basic_recover_requeue(self):
+        """
+        Test requeing on recovery
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-requeue")
+        
+        subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        channel.basic_publish(routing_key="test-requeue", content=Content("One"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_cancel(consumer_tag=subscription.consumer_tag)
+        subscription2 = channel.basic_consume(queue="test-requeue")
+        queue2 = self.client.queue(subscription2.consumer_tag)
+
+        channel.basic_recover(requeue=True)
+        
+        msg3b = queue2.get(timeout=1)
+        msg5b = queue2.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        self.assertTrue(msg3b.redelivered)
+        self.assertTrue(msg5b.redelivered)
+
+        try:
+            extra = queue2.get(timeout=1)
+            self.fail("Got unexpected message in second queue: " + extra.content.body)
+        except Empty: None
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in original queue: " + extra.content.body)
+        except Empty: None
+        

Modified: incubator/qpid/trunk/qpid/python/tests/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/broker.py?view=diff&rev=450504&r1=450503&r2=450504
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/broker.py Wed Sep 27 09:44:02 2006
@@ -101,47 +101,3 @@
         except Closed, e:
             self.assertConnectionException(504, e.args[0])
 
-
-    def test_acknowledgement(self):
-        """
-        Test basic ack/recover behaviour
-        """
-        channel = self.channel
-        channel.queue_declare(queue="test-ack-queue")
-        
-        reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
-        queue = self.client.queue(reply.consumer_tag)
-
-        channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
-        channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
-        channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
-        channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
-        channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
-                
-        msg1 = queue.get(timeout=1)
-        msg2 = queue.get(timeout=1)
-        msg3 = queue.get(timeout=1)
-        msg4 = queue.get(timeout=1)
-        msg5 = queue.get(timeout=1)
-        
-        self.assertEqual("One", msg1.content.body)
-        self.assertEqual("Two", msg2.content.body)
-        self.assertEqual("Three", msg3.content.body)
-        self.assertEqual("Four", msg4.content.body)
-        self.assertEqual("Five", msg5.content.body)
-
-        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
-        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
-        channel.basic_recover(requeue=False)
-        
-        msg3b = queue.get(timeout=1)
-        msg5b = queue.get(timeout=1)
-        
-        self.assertEqual("Three", msg3b.content.body)
-        self.assertEqual("Five", msg5b.content.body)
-
-        try:
-            extra = queue.get(timeout=1)
-            self.fail("Got unexpected message: " + extra.content.body)
-        except Empty: None



Mime
View raw message