Author: gsim Date: Thu Oct 4 05:01:28 2007 New Revision: 581869 URL: http://svn.apache.org/viewvc?rev=581869&view=rev Log: Fix (and refactor) processing of ranges in message handler. Alter release() to push released messages onto head in reverse order (todo: make this atomic instead) Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=581869&r1=581868&r2=581869&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Oct 4 05:01:28 2007 @@ -75,6 +75,7 @@ friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; +typedef std::list DeliveryRecords; typedef std::list::iterator ack_iterator; struct AckRange Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=581869&r1=581868&r2=581869&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Thu Oct 4 05:01:28 2007 @@ -29,13 +29,18 @@ #include #include +#include namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {} +MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : + HandlerImpl(s), + releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), + rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) + {} // // Message class method handlers @@ -86,52 +91,18 @@ } void -MessageHandlerImpl::subscribe(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - u_int8_t confirmMode, - u_int8_t acquireMode, - bool exclusive, - const framing::FieldTable& filter ) -{ - Queue::shared_ptr queue = state.getQueue(queueName); - if(!destination.empty() && state.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); - - string tag = destination; - //NB: am assuming pre-acquired = 0 as discussed on SIG list as is - //the previously expected behaviour - state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); - // Dispatch messages as there is now a consumer. - queue->requestDispatch(); -} - - -void MessageHandlerImpl::get(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noAck ) + const string& /*queueName*/, + const string& /*destination*/, + bool /*noAck*/ ) { - Queue::shared_ptr queue = state.getQueue(queueName); - - if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ - //don't send any response... rely on execution completion - } else { - //temporarily disabled: - //client.empty(); - } + throw ConnectionException(540, "get no longer supported"); } void MessageHandlerImpl::empty() { - // Shouldn't ever receive this as it is a response to get - // which is never sent - // TODO astitcher 2007-02-09 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); + throw ConnectionException(540, "empty no longer supported"); } void @@ -151,6 +122,27 @@ } void +MessageHandlerImpl::subscribe(uint16_t /*ticket*/, + const string& queueName, + const string& destination, + bool noLocal, + u_int8_t confirmMode, + u_int8_t acquireMode, + bool exclusive, + const framing::FieldTable& filter ) +{ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) + throw ConnectionException(530, "Consumer tags must be unique"); + + string tag = destination; + state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); + // Dispatch messages as there is now a consumer. + queue->requestDispatch(); +} + +void MessageHandlerImpl::recover(bool requeue) { state.recover(requeue); @@ -159,13 +151,7 @@ void MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) { - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.reject(i->getValue(), (++i)->getValue()); - } + transfers.processRanges(rejectOp); } void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) @@ -209,29 +195,17 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) { - SequenceNumberSet results; - - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.acquire(i->getValue(), (++i)->getValue(), results); - } + //TODO: implement mode + SequenceNumberSet results; + transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results)); results = results.condense(); getProxy().getMessage().acquired(results); } void MessageHandlerImpl::release(const SequenceNumberSet& transfers) { - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.release(i->getValue(), (++i)->getValue()); - } + transfers.processRanges(releaseOp); } }} // namespace qpid::broker Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?rev=581869&r1=581868&r2=581869&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Thu Oct 4 05:01:28 2007 @@ -25,6 +25,8 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "HandlerImpl.h" +#include + namespace qpid { namespace broker { @@ -36,6 +38,10 @@ public framing::AMQP_ServerOperations::MessageHandler, public HandlerImpl { + typedef boost::function RangedOperation; + RangedOperation releaseOp; + RangedOperation rejectOp; + public: MessageHandlerImpl(SemanticState&); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=581869&r1=581868&r2=581869&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 4 05:01:28 2007 @@ -572,7 +572,11 @@ { Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); - for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); + //release results in the message being added to the head so want + //to release in reverse order to keep the original transfer order + DeliveryRecords::reverse_iterator start(range.end); + DeliveryRecords::reverse_iterator end(range.start); + for_each(start, end, mem_fun_ref(&DeliveryRecord::release)); } void SemanticState::reject(DeliveryId first, DeliveryId last) Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h?rev=581869&r1=581868&r2=581869&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h Thu Oct 4 05:01:28 2007 @@ -26,6 +26,7 @@ #include "amqp_types.h" #include "Buffer.h" #include "SequenceNumber.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -41,8 +42,23 @@ uint32_t encodedSize() const; SequenceNumberSet condense() const; + template + void processRanges(T t) const + { + if (size() % 2) { //must be even number + throw InvalidArgumentException("SequenceNumberSet contains odd number of elements"); + } + + for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) { + SequenceNumber first = i->getValue(); + SequenceNumber last = (++i)->getValue(); + t(first, last); + } + } + friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&); }; + }} // namespace qpid::framing