Author: gsim Date: Fri Oct 5 10:28:48 2007 New Revision: 582353 URL: http://svn.apache.org/viewvc?rev=582353&view=rev Log: Don't recover messages for cancelled subscriptions. Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=582353&r1=582352&r2=582353&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct 5 10:28:48 2007 @@ -39,18 +39,21 @@ id(_id), acquired(_acquired), confirmed(_confirmed), - pull(false) + pull(false), + cancelled(false) { } DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, Queue::shared_ptr _queue, const DeliveryId _id) : msg(_msg), - queue(_queue), - id(_id), - acquired(true), - confirmed(false), - pull(true){} + queue(_queue), + id(_id), + acquired(true), + confirmed(false), + pull(true), + cancelled(false) +{} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ @@ -76,7 +79,7 @@ } void DeliveryRecord::redeliver(SemanticState* const session) { - if (!confirmed) { + if (!confirmed && !cancelled) { if(pull){ //if message was originally sent as response to get, we must requeue it requeue(); @@ -147,6 +150,12 @@ } else { QPID_LOG(info, "Message already acquired " << id.getValue()); } +} + +void DeliveryRecord::cancel(const std::string& cancelledTag) +{ + if (tag == cancelledTag) + cancelled = true; } namespace qpid { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=582353&r1=582352&r2=582353&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct 5 10:28:48 2007 @@ -49,6 +49,7 @@ bool acquired; const bool confirmed; const bool pull; + bool cancelled; public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, @@ -63,6 +64,7 @@ void requeue() const; void release(); void reject(); + void cancel(const std::string& tag); void redeliver(SemanticState* const); void updateByteCredit(uint32_t& credit) const; void addTo(Prefetch&) const; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=582353&r1=582352&r2=582353&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 5 10:28:48 2007 @@ -95,8 +95,14 @@ // consumers is a ptr_map so erase will delete the consumer // which will call cancel. ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) + if (i != consumers.end()) { consumers.erase(i); + //should cancel all unacked messages for this consumer so that + //they are not redelivered on recovery + Mutex::ScopedLock locker(deliveryLock); + for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag)); + + } } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=582353&r1=582352&r2=582353&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Fri Oct 5 10:28:48 2007 @@ -75,6 +75,11 @@ return method; } + const framing::SequenceNumber& getId() const + { + return id; + } + private: //method and id are only set for received messages: const framing::MessageTransferBody method;