qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r582353 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/DeliveryRecord.cpp broker/DeliveryRecord.h broker/SemanticState.cpp client/Message.h
Date Fri, 05 Oct 2007 17:28:53 GMT
Author: gsim
Date: Fri Oct  5 10:28:48 2007
New Revision: 582353

URL: http://svn.apache.org/viewvc?rev=582353&view=rev
Log:
Don't recover messages for cancelled subscriptions.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct  5 10:28:48 2007
@@ -39,18 +39,21 @@
                                                                   id(_id),
                                                                   acquired(_acquired),
                                                                   confirmed(_confirmed),
-                                                                  pull(false)
+                                                                  pull(false), 
+                                                                  cancelled(false)
 {
 }
 
 DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
                                const DeliveryId _id) : msg(_msg), 
-                                                                queue(_queue), 
-                                                                id(_id),
-                                                                acquired(true),
-                                                                confirmed(false),
-                                                                pull(true){}
+                                                       queue(_queue), 
+                                                       id(_id),
+                                                       acquired(true),
+                                                       confirmed(false),
+                                                       pull(true),
+                                                       cancelled(false)
+{}
 
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
@@ -76,7 +79,7 @@
 }
 
 void DeliveryRecord::redeliver(SemanticState* const session) {
-    if (!confirmed) {
+    if (!confirmed && !cancelled) {
         if(pull){
             //if message was originally sent as response to get, we must requeue it
             requeue();
@@ -147,6 +150,12 @@
     } else {
         QPID_LOG(info, "Message already acquired " << id.getValue());
     }
+}
+
+void DeliveryRecord::cancel(const std::string& cancelledTag) 
+{
+    if (tag == cancelledTag)
+        cancelled = true;
 }
 
 namespace qpid {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct  5 10:28:48 2007
@@ -49,6 +49,7 @@
     bool acquired;
     const bool confirmed;
     const bool pull;
+    bool cancelled;
 
   public:
     DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string
tag, DeliveryToken::shared_ptr token, 
@@ -63,6 +64,7 @@
     void requeue() const;
     void release();
     void reject();
+    void cancel(const std::string& tag);
     void redeliver(SemanticState* const);
     void updateByteCredit(uint32_t& credit) const;
     void addTo(Prefetch&) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct  5 10:28:48 2007
@@ -95,8 +95,14 @@
     // consumers is a ptr_map so erase will delete the consumer
     // which will call cancel.
     ConsumerImplMap::iterator i = consumers.find(tag);
-    if (i != consumers.end())
+    if (i != consumers.end()) {
         consumers.erase(i); 
+        //should cancel all unacked messages for this consumer so that
+        //they are not redelivered on recovery
+        Mutex::ScopedLock locker(deliveryLock);   
+        for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel),
_1, tag));
+        
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Fri Oct  5 10:28:48 2007
@@ -75,6 +75,11 @@
         return method;
     }
 
+    const framing::SequenceNumber& getId() const
+    {
+        return id;
+    }
+
 private:
     //method and id are only set for received messages:
     const framing::MessageTransferBody method;



Mime
View raw message