qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r450434 - in /incubator/qpid/trunk/qpid: cpp/broker/inc/Channel.h cpp/broker/src/Channel.cpp cpp/broker/src/SessionHandlerImpl.cpp cpp/client/test/topic_listener.cpp python/tests/broker.py
Date Wed, 27 Sep 2006 13:21:44 GMT
Author: gsim
Date: Wed Sep 27 06:21:43 2006
New Revision: 450434

URL: http://svn.apache.org/viewvc?view=rev&rev=450434
Log:
Initial implementation of basic_ack & basic_recover


Modified:
    incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
    incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/client/test/topic_listener.cpp
    incubator/qpid/trunk/qpid/python/tests/broker.py

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h?view=diff&rev=450434&r1=450433&r2=450434
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h Wed Sep 27 06:21:43 2006
@@ -18,6 +18,7 @@
 #ifndef _Channel_
 #define _Channel_
 
+#include <algorithm>
 #include <map>
 #include "AMQContentBody.h"
 #include "AMQHeaderBody.h"
@@ -35,18 +36,53 @@
         class Channel{
         private:
             class ConsumerImpl : public virtual Consumer{
-                ConnectionToken* const connection;
                 Channel* parent;
                 string tag;
                 Queue::shared_ptr queue;
+                ConnectionToken* const connection;
+                const bool ackExpected;
             public:
-                ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken*
const connection);
+                ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken*
const connection, bool ack);
                 virtual bool deliver(Message::shared_ptr& msg);            
                 void cancel();
             };
 
             typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; 
 
+            struct AckRecord{
+                Message::shared_ptr msg;
+                Queue::shared_ptr queue;
+                string consumerTag;
+                u_int64_t deliveryTag;
+
+                AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, 
+                          string _consumerTag, u_int64_t _deliveryTag) : msg(_msg), 
+                                                                        queue(_queue), 
+                                                                        consumerTag(_consumerTag),
+                                                                        deliveryTag(_deliveryTag){}
+            };
+
+            typedef std::vector<AckRecord>::iterator ack_iterator; 
+
+            class MatchAck{
+                const u_int64_t tag;
+            public:
+                MatchAck(u_int64_t tag);
+                bool operator()(AckRecord& record) const;
+            };
+
+            class Requeue{
+            public:
+                void operator()(AckRecord& record) const;
+            };
+
+            class Redeliver{
+                Channel* const channel;
+            public:
+                Redeliver(Channel* const channel);
+                void operator()(AckRecord& record) const;
+            };
+
             const int id;
             qpid::framing::OutputHandler* out;
             u_int64_t deliveryTag;
@@ -58,8 +94,10 @@
             u_int32_t framesize;
             Message::shared_ptr message;
             NameGenerator tagGenerator;
+            std::vector<AckRecord> unacknowledged;
+            qpid::concurrent::MonitorImpl deliveryLock;
 
-            void deliver(Message::shared_ptr& msg, string& tag);            
+            void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr&
queue, bool ackExpected);            
             void publish(ExchangeRegistry* exchanges);
         
         public:
@@ -79,6 +117,8 @@
             void close();
             void commit();
             void rollback();
+            void ack(u_int64_t deliveryTag, bool multiple);
+            void recover(bool requeue);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=diff&rev=450434&r1=450433&r2=450434
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Wed Sep 27 06:21:43 2006
@@ -25,6 +25,7 @@
 using namespace qpid::framing;
 using namespace qpid::concurrent;
 
+
 Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), 
                                                                        id(_id), 
                                                                        framesize(_framesize),
@@ -46,7 +47,7 @@
 void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive,
ConnectionToken* const connection){
     if(tag.empty()) tag = tagGenerator.generate();
 
-    ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection));
+    ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
     try{
         queue->consume(c, exclusive);//may throw exception
         consumers[tag] = c;
@@ -92,22 +93,29 @@
 
 }
 
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr&
queue, bool ackExpected){
+    Locker locker(deliveryLock);
+
+    u_int64_t myDeliveryTag = deliveryTag++;
+    if(ackExpected){
+        unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
+    }
     //send deliver method, header and content(s)
-    msg->deliver(out, id, consumerTag, deliveryTag++, framesize);
+    msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
 }
 
 Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, 
                                     Queue::shared_ptr _queue, 
-                                    ConnectionToken* const _connection) : parent(_parent),

-                                                                         tag(_tag), 
-                                                                         queue(_queue),
-                                                                         connection(_connection){
+                                    ConnectionToken* const _connection, bool ack) : parent(_parent),

+                                                                                    tag(_tag),

+                                                                                    queue(_queue),
+                                                                                    connection(_connection),
+                                                                                    ackExpected(ack){
 }
 
 bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
     if(connection != msg->getPublisher()){
-        parent->deliver(msg, tag);
+        parent->deliver(msg, tag, queue, ackExpected);
         return true;
     }else{
         return false;
@@ -150,4 +158,41 @@
         std::cout << "WARNING: Could not route message." << std::endl;
     }
     message.reset();
+}
+
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+    ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag));
+    if(i == unacknowledged.end()){
+        //error: how should this be signalled?
+    }else if(multiple){
+        unacknowledged.erase(unacknowledged.begin(), ++i);
+    }else{
+        unacknowledged.erase(i);
+    }
+}
+
+void Channel::recover(bool requeue){
+    if(requeue){
+        //TODO: need to set redelivered flag
+        for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
+        unacknowledged.clear();
+    }else{
+        for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));        
+    }
+}
+
+Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
+
+bool Channel::MatchAck::operator()(AckRecord& record) const{
+    return tag == record.deliveryTag;
+}
+
+void Channel::Requeue::operator()(AckRecord& record) const{
+    record.queue->deliver(record.msg);
+}
+
+Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
+
+void Channel::Redeliver::operator()(AckRecord& record) const{
+    record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag,
channel->framesize);
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=450434&r1=450433&r2=450434
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Wed Sep 27 06:21:43 2006
@@ -377,9 +377,13 @@
         
 void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string&
queue, bool noAck){} 
         
-void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag,
bool multiple){} 
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag,
bool multiple){
+    parent->getChannel(channel)->ack(deliveryTag, multiple);
+} 
         
 void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag,
bool requeue){} 
         
-void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){} 
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+    parent->getChannel(channel)->recover(requeue);
+} 
               

Modified: incubator/qpid/trunk/qpid/cpp/client/test/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/test/topic_listener.cpp?view=diff&rev=450434&r1=450433&r2=450434
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/test/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/client/test/topic_listener.cpp Wed Sep 27 06:21:43 2006
@@ -129,7 +129,7 @@
     report << "Received " << count << " messages in " << time <<
" ms.";
     Message msg;
     msg.setData(report.str());
-    channel->publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, responseQueue);
+    channel->publish(msg, string(""), responseQueue);
     if(transactional){
         channel->commit();
     }

Modified: incubator/qpid/trunk/qpid/python/tests/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/broker.py?view=diff&rev=450434&r1=450433&r2=450434
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/broker.py Wed Sep 27 06:21:43 2006
@@ -100,3 +100,48 @@
             self.fail("Expected error on queue_declare for closed channel")
         except Closed, e:
             self.assertConnectionException(504, e.args[0])
+
+
+    def test_acknowledgement(self):
+        """
+        Test basic ack/recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-ack-queue")
+        
+        reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+        queue = self.client.queue(reply.consumer_tag)
+
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_recover(requeue=False)
+        
+        msg3b = queue.get(timeout=1)
+        msg5b = queue.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.content.body)
+        except Empty: None



Mime
View raw message