qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r581869 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/DeliveryRecord.h broker/MessageHandlerImpl.cpp broker/MessageHandlerImpl.h broker/SemanticState.cpp framing/SequenceNumberSet.h
Date Thu, 04 Oct 2007 12:01:29 GMT
Author: gsim
Date: Thu Oct  4 05:01:28 2007
New Revision: 581869

URL: http://svn.apache.org/viewvc?rev=581869&view=rev
Log:
Fix (and refactor) processing of ranges in message handler.
Alter release() to push released messages onto head in reverse order (todo: make this atomic
instead)


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=581869&r1=581868&r2=581869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Oct  4 05:01:28 2007
@@ -75,6 +75,7 @@
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };
 
+typedef std::list<DeliveryRecord> DeliveryRecords; 
 typedef std::list<DeliveryRecord>::iterator ack_iterator; 
 
 struct AckRange

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=581869&r1=581868&r2=581869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Thu Oct  4 05:01:28
2007
@@ -29,13 +29,18 @@
 
 #include <boost/format.hpp>
 #include <boost/cast.hpp>
+#include <boost/bind.hpp>
 
 namespace qpid {
 namespace broker {
 
 using namespace framing;
 
-MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
+    HandlerImpl(s),
+    releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+    rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
+ {}
 
 //
 // Message class method handlers
@@ -86,52 +91,18 @@
 }
 
 void
-MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
-                            const string& queueName,
-                            const string& destination,
-                            bool noLocal,
-                            u_int8_t confirmMode,
-                            u_int8_t acquireMode,
-                            bool exclusive,
-                            const framing::FieldTable& filter )
-{
-    Queue::shared_ptr queue = state.getQueue(queueName);
-    if(!destination.empty() && state.exists(destination))
-        throw ConnectionException(530, "Consumer tags must be unique");
-
-    string tag = destination;
-    //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
-    //the previously expected behaviour
-    state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),

-                    tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
-    // Dispatch messages as there is now a consumer.
-    queue->requestDispatch();
-}
-
-
-void
 MessageHandlerImpl::get(uint16_t /*ticket*/,
-                         const string& queueName,
-                         const string& destination,
-                         bool noAck )
+                        const string& /*queueName*/,
+                        const string& /*destination*/,
+                        bool /*noAck*/ )
 {
-    Queue::shared_ptr queue = state.getQueue(queueName);
-    
-    if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0),
queue, !noAck)){
-        //don't send any response... rely on execution completion
-    } else {
-        //temporarily disabled:
-        //client.empty();
-    }
+    throw ConnectionException(540, "get no longer supported");
 }
 
 void
 MessageHandlerImpl::empty()
 {
-    // Shouldn't ever receive this as it is a response to get
-    // which is never sent
-    // TODO astitcher 2007-02-09 What is the correct exception to throw here?
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
+    throw ConnectionException(540, "empty no longer supported");
 }
 
 void
@@ -151,6 +122,27 @@
 }
 
 void
+MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
+                            const string& queueName,
+                            const string& destination,
+                            bool noLocal,
+                            u_int8_t confirmMode,
+                            u_int8_t acquireMode,
+                            bool exclusive,
+                            const framing::FieldTable& filter )
+{
+    Queue::shared_ptr queue = state.getQueue(queueName);
+    if(!destination.empty() && state.exists(destination))
+        throw ConnectionException(530, "Consumer tags must be unique");
+
+    string tag = destination;
+    state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),

+                    tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
+    // Dispatch messages as there is now a consumer.
+    queue->requestDispatch();
+}
+
+void
 MessageHandlerImpl::recover(bool requeue)
 {
     state.recover(requeue);
@@ -159,13 +151,7 @@
 void
 MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const
string& /*text*/ )
 {
-    if (transfers.size() % 2) { //must be even number        
-        throw InvalidArgumentException("Received odd number of elements in list of transfers");
-    }
-    
-    for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++)
{
-        state.reject(i->getValue(), (++i)->getValue());
-    }
+    transfers.processRanges(rejectOp);
 }
 
 void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t
value)
@@ -209,29 +195,17 @@
 
 void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
 {
-    SequenceNumberSet results;
-
-    if (transfers.size() % 2) { //must be even number        
-        throw InvalidArgumentException("Received odd number of elements in list of transfers");
-    }
-    
-    for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++)
{
-        state.acquire(i->getValue(), (++i)->getValue(), results);
-    }
+    //TODO: implement mode
 
+    SequenceNumberSet results;
+    transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2,
results));
     results = results.condense();
     getProxy().getMessage().acquired(results);
 }
 
 void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
 {
-    if (transfers.size() % 2) { //must be even number        
-        throw InvalidArgumentException("Received odd number of elements in list of transfers");
-    }
-    
-    for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++)
{
-        state.release(i->getValue(), (++i)->getValue());
-    }
+    transfers.processRanges(releaseOp);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?rev=581869&r1=581868&r2=581869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Thu Oct  4 05:01:28
2007
@@ -25,6 +25,8 @@
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "HandlerImpl.h"
 
+#include <boost/function.hpp>
+
 namespace qpid {
 namespace broker {
 
@@ -36,6 +38,10 @@
         public framing::AMQP_ServerOperations::MessageHandler,
         public HandlerImpl
 {
+    typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;    
+    RangedOperation releaseOp;
+    RangedOperation rejectOp;
+
   public:
     MessageHandlerImpl(SemanticState&);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=581869&r1=581868&r2=581869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct  4 05:01:28 2007
@@ -572,7 +572,11 @@
 {
     Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);
-    for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
+    //release results in the message being added to the head so want
+    //to release in reverse order to keep the original transfer order
+    DeliveryRecords::reverse_iterator start(range.end);
+    DeliveryRecords::reverse_iterator end(range.start);
+    for_each(start, end, mem_fun_ref(&DeliveryRecord::release));
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h?rev=581869&r1=581868&r2=581869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h Thu Oct  4 05:01:28
2007
@@ -26,6 +26,7 @@
 #include "amqp_types.h"
 #include "Buffer.h"
 #include "SequenceNumber.h"
+#include "qpid/framing/reply_exceptions.h"
 
 namespace qpid {
 namespace framing {
@@ -41,8 +42,23 @@
     uint32_t encodedSize() const;   
     SequenceNumberSet condense() const;
 
+    template <class T>
+    void processRanges(T t) const
+    {
+        if (size() % 2) { //must be even number        
+            throw InvalidArgumentException("SequenceNumberSet contains odd number of elements");
+        }
+    
+        for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) {
+            SequenceNumber first = i->getValue();
+            SequenceNumber last = (++i)->getValue();
+            t(first, last);
+        }
+    }
+
     friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
 };    
+
 
 }} // namespace qpid::framing
 



Mime
View raw message