qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1130291 - in /qpid/branches/qpid-3079/qpid/cpp/src: qpid/broker/ tests/
Date Wed, 01 Jun 2011 20:22:38 GMT
Author: kgiusti
Date: Wed Jun  1 20:22:37 2011
New Revision: 1130291

URL: http://svn.apache.org/viewvc?rev=1130291&view=rev
Log:
Initial design attempt.

Modified:
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStore.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/branches/qpid-3079/qpid/cpp/src/tests/QueueTest.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/tests/TestMessageStore.h

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h Wed Jun  1 20:22:37 2011
@@ -23,6 +23,7 @@
  */
 
 #include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
 
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/sys/AtomicValue.h"
@@ -77,7 +78,7 @@ namespace broker {
  * assuming no need for synchronization with Completer threads.
  */
 
-class AsyncCompletion
+class AsyncCompletion : private boost::noncopyable
 {
  public:
 
@@ -88,7 +89,7 @@ class AsyncCompletion
      * callback object will be used by the last completer thread, and
      * released when the callback returns.
      */
-    class Callback : public RefCounted
+    class Callback : virtual public RefCounted
     {
   public:
         virtual void completed(bool) = 0;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Jun  1 20:22:37 2011
@@ -110,9 +110,10 @@ void DeliveryRecord::complete()  {
     completed = true; 
 }
 
-bool DeliveryRecord::accept(TransactionContext* ctxt) {
+/** Accept msg, and optionally notify caller when dequeue completes */
+bool DeliveryRecord::accept(TransactionContext* ctxt, Queue::DequeueDoneCallbackFactory *f) {
     if (acquired && !ended) {
-        queue->dequeue(ctxt, msg);
+        queue->dequeue(ctxt, msg, f);
         setEnded();
         QPID_LOG(debug, "Accepted " << id);
     }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed Jun  1 20:22:37 2011
@@ -31,6 +31,7 @@
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/broker/DeliveryId.h"
 #include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
 
 namespace qpid {
 namespace broker {
@@ -39,6 +40,7 @@ class TransactionContext;
 class SemanticState;
 struct AckRange;
 
+
 /**
  * Record of a delivery for which an ack is outstanding.
  */
@@ -84,7 +86,7 @@ class DeliveryRecord
     void redeliver(SemanticState* const);
     void acquire(DeliveryIds& results);
     void complete();
-    bool accept(TransactionContext* ctxt); // Returns isRedundant()
+    bool accept(TransactionContext*, Queue::DequeueDoneCallbackFactory *);  // Returns isRedundant()
     bool setEnded();            // Returns isRedundant()
     void committed() const;
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.cpp Wed Jun  1 20:22:37 2011
@@ -419,11 +419,6 @@ struct ScopedSet {
 };
 }
 
-void Message::allDequeuesComplete() {
-    ScopedSet ss(callbackLock, inCallback);
-    MessageCallback* cb = dequeueCallback;
-    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
 
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.h Wed Jun  1 20:22:37 2011
@@ -163,7 +163,6 @@ public:
     void setIsManagementMessage(bool b);
   private:
     MessageAdapter& getAdapter() const;
-    void allDequeuesComplete();
 
     mutable sys::Mutex lock;
     framing::FrameSet frames;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStore.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStore.h Wed Jun  1 20:22:37 2011
@@ -172,7 +172,7 @@ class MessageStore : public Transactiona
      */
     virtual void dequeue(TransactionContext* ctxt,
                          const boost::intrusive_ptr<PersistableMessage>& msg,
-                         const PersistableQueue& queue) = 0;
+                         const boost::shared_ptr<PersistableQueue>& queue) = 0;
 
     /**
      * Flushes all async messages to disk for the specified queue

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Wed Jun  1 20:22:37 2011
@@ -27,6 +27,7 @@
 #define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); }
 
 using boost::intrusive_ptr;
+using boost::shared_ptr;
 using qpid::framing::FieldTable;
 using std::string;
 
@@ -127,7 +128,7 @@ void MessageStoreModule::enqueue(Transac
 
 void MessageStoreModule::dequeue(TransactionContext* ctxt,
                                  const intrusive_ptr<PersistableMessage>& msg,
-                                 const PersistableQueue& queue)
+                                 const shared_ptr<PersistableQueue>& queue)
 {
     TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
 }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.h Wed Jun  1 20:22:37 2011
@@ -72,7 +72,7 @@ class MessageStoreModule : public Messag
                  const PersistableQueue& queue);
     void dequeue(TransactionContext* ctxt,
                  const boost::intrusive_ptr<PersistableMessage>& msg,
-                 const PersistableQueue& queue);
+                 const boost::shared_ptr<PersistableQueue>& queue);
     uint32_t outstandingQueueAIO(const PersistableQueue& queue);
     void flush(const qpid::broker::PersistableQueue& queue);
     bool isNull() const;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Wed Jun  1 20:22:37 2011
@@ -28,6 +28,7 @@
 #include <iostream>
 
 using boost::intrusive_ptr;
+using boost::shared_ptr;
 
 namespace qpid{
 namespace broker{
@@ -103,9 +104,9 @@ void NullMessageStore::enqueue(Transacti
 
 void NullMessageStore::dequeue(TransactionContext*,
                                const intrusive_ptr<PersistableMessage>& msg,
-                               const PersistableQueue&)
+                               const shared_ptr<PersistableQueue>& q)
 {
-    msg->dequeueComplete();
+    q->dequeueComplete(msg);
 }
 
 void NullMessageStore::flush(const qpid::broker::PersistableQueue&) {}

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.h Wed Jun  1 20:22:37 2011
@@ -28,6 +28,7 @@
 #include "qpid/sys/Mutex.h"
 
 #include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace broker {
@@ -84,7 +85,7 @@ class QPID_BROKER_CLASS_EXTERN NullMessa
                                             const PersistableQueue& queue);
     QPID_BROKER_EXTERN virtual void dequeue(TransactionContext* ctxt,
                                             const boost::intrusive_ptr<PersistableMessage>& msg,
-                                            const PersistableQueue& queue);
+                                            const boost::shared_ptr<PersistableQueue>& queue);
     QPID_BROKER_EXTERN virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue);
     QPID_BROKER_EXTERN virtual void flush(const qpid::broker::PersistableQueue& queue);
     ~NullMessageStore(){}

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Wed Jun  1 20:22:37 2011
@@ -34,7 +34,6 @@ class MessageStore;
 PersistableMessage::~PersistableMessage() {}
 
 PersistableMessage::PersistableMessage() :
-    asyncDequeueCounter(0),
     store(0)
 {}
 
@@ -43,18 +42,18 @@ void PersistableMessage::flush()
     syncList copy;
     {
         sys::ScopedLock<sys::Mutex> l(storeLock);
-	if (store) {
-	    copy = synclist;
-	} else {
+        if (store) {
+            copy = synclist;
+        } else {
             return;//early exit as nothing to do
-	}
+        }
     }
     for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
-        PersistableQueue::shared_ptr q(i->lock());
+        PersistableQueue::shared_ptr q(i->second.lock());
         if (q) {
             q->flush();
         }
-    } 
+    }
 }
 
 void PersistableMessage::setContentReleased()
@@ -70,8 +69,9 @@ bool PersistableMessage::isContentReleas
 
 bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
     if (store && (queue->getPersistenceId()!=0)) {
+        sys::ScopedLock<sys::Mutex> l(storeLock);
         for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
-            PersistableQueue::shared_ptr q(i->lock());
+            PersistableQueue::shared_ptr q(i->second.lock());
             if (q && q->getPersistenceId() == queue->getPersistenceId())  return true;
         } 
     }            
@@ -84,7 +84,7 @@ void PersistableMessage::addToSyncList(P
         sys::ScopedLock<sys::Mutex> l(storeLock);
         store = _store;
         boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
+        synclist[queue->getName()] = q;
     }
 }
 
@@ -93,37 +93,12 @@ void PersistableMessage::enqueueAsync(Pe
     enqueueStart();
 }
 
-bool PersistableMessage::isDequeueComplete() { 
-    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-    return asyncDequeueCounter == 0;
-}
-    
-void PersistableMessage::dequeueComplete() { 
-    bool notify = false;
-    {
-        sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-        if (asyncDequeueCounter > 0) {
-            if (--asyncDequeueCounter == 0) {
-                notify = true;
-            }
-        }
-    }
-    if (notify) allDequeuesComplete();
-}
-
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
+void PersistableMessage::dequeueComplete(PersistableQueue::shared_ptr queue, MessageStore* _store)
+{
     if (_store){
         sys::ScopedLock<sys::Mutex> l(storeLock);
-        store = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
+        synclist.erase(queue->getName());
     }
-    dequeueAsync();
-}
-
-void PersistableMessage::dequeueAsync() { 
-    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-    asyncDequeueCounter++; 
 }
 
 PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Jun  1 20:22:37 2011
@@ -43,8 +43,7 @@ class MessageStore;
  */
 class PersistableMessage : public Persistable
 {
-    typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Mutex asyncDequeueLock;
+    typedef std::map< std::string, boost::weak_ptr<PersistableQueue> > syncList;
     sys::Mutex storeLock;
 
     /**
@@ -58,17 +57,6 @@ class PersistableMessage : public Persis
      */
     AsyncCompletion ingressCompletion;
 
-    /**
-     * Tracks the number of outstanding asynchronous dequeue
-     * operations. When the message is dequeued asynchronously the
-     * count is incremented; when that dequeue completes it is
-     * decremented. Thus when it is 0, there are no outstanding
-     * dequeues.
-     */
-    int asyncDequeueCounter;
-
-    void dequeueAsync();
-
     syncList synclist;
     struct ContentReleaseState
     {
@@ -81,8 +69,6 @@ class PersistableMessage : public Persis
     ContentReleaseState contentReleaseState;
 
   protected:
-    /** Called when all dequeues are complete for this message. */
-    virtual void allDequeuesComplete() = 0;
 
     void setContentReleased();
 
@@ -124,13 +110,10 @@ class PersistableMessage : public Persis
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
 
+    QPID_BROKER_EXTERN void dequeueComplete(PersistableQueue::shared_ptr queue,
+                                            MessageStore* _store);
 
-    QPID_BROKER_EXTERN bool isDequeueComplete();
-    
-    QPID_BROKER_EXTERN void dequeueComplete();
-
-    QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
-                                         MessageStore* _store);
+    QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
 
     bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
     

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h Wed Jun  1 20:22:37 2011
@@ -26,10 +26,12 @@
 #include "qpid/broker/Persistable.h"
 #include "qpid/management/Manageable.h"
 #include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 namespace broker {
 
+class PersistableMessage;
 
 /**
 * Empty class to be used by any module that wanted to set an external per queue store into
@@ -66,6 +68,9 @@ public:
     
     PersistableQueue():externalQueueStore(NULL){
     };
+
+    /** the message has finished being dequeued from the store */
+    virtual void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>&) = 0;
     
 protected:
     ExternalQueueStore* externalQueueStore;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jun  1 20:22:37 2011
@@ -438,7 +438,8 @@ void Queue::purgeExpired()
             Mutex::ScopedLock locker(messageLock);
             messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
         }
-        for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+        for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
+                                                             (DequeueDoneCallbackFactory*)0));
     }
 }
 
@@ -613,7 +614,8 @@ bool Queue::enqueue(TransactionContext* 
             policy->getPendingDequeues(dequeues);
         }
         //depending on policy, may have some dequeues that need to performed without holding the lock
-        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));        
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
+                                                               (DequeueDoneCallbackFactory*)0));
     }
 
     if (inLastNodeFailure && persistLastNode){
@@ -652,8 +654,13 @@ void Queue::enqueueAborted(boost::intrus
     if (policy.get()) policy->enqueueAborted(msg);       
 }
 
-// return true if store exists, 
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
+/**
+ * returns false if the dequeue completed, otherwise the dequeue will complete
+ * asynchronously.  If the caller needs to know when an asynchronous dequeue
+ * completes, it must provide a factory that will provide the callback.
+ */
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
+                    DequeueDoneCallbackFactory *factory)
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
@@ -670,9 +677,14 @@ bool Queue::dequeue(TransactionContext* 
     bool fp = msg.payload->isForcedPersistent();
     if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) {
         if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
-            msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+            msg.payload->dequeueAsync(shared_from_this(), store);
             boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
-            store->dequeue(ctxt, pmsg, *this);
+            if (factory) {
+                boost::shared_ptr<DequeueDoneCallback> callback((*factory)());
+                Mutex::ScopedLock locker(messageLock);
+                pendingDequeueCallbacks[pmsg.get()] = callback;
+            }
+            store->dequeue(ctxt, pmsg, shared_from_this());  // invokes Queue::dequeueComplete() when done
             return true;
         }
     }
@@ -1109,7 +1121,8 @@ void Queue::recoveryComplete(ExchangeReg
         }
     }
     //process any pending dequeues
-    for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+    for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
+                                                                         (DequeueDoneCallbackFactory*)0));
     pendingDequeues.clear();
 }
 
@@ -1198,6 +1211,27 @@ const Broker* Queue::getBroker()
 }
 
 
+/** invoked from the store thread when the asynchronous dequeueing of the
+ * message has completed. */
+void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg )
+{
+    msg->dequeueComplete(shared_from_this(), store);
+
+    boost::shared_ptr<DequeueDoneCallback> cb;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i;
+        i = pendingDequeueCallbacks.find(msg.get());
+        if (i != pendingDequeueCallbacks.end()) {
+            cb = i->second;
+            pendingDequeueCallbacks.erase(i);
+        }
+    }
+    if (cb) (*cb)();
+    QPID_LOG(error, "dequeueComplete:=" << cb);
+}
+
+
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h Wed Jun  1 20:22:37 2011
@@ -275,9 +275,30 @@ class Queue : public boost::enable_share
     bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
     void enqueueAborted(boost::intrusive_ptr<Message> msg);
     /**
-     * dequeue from store (only done once messages is acknowledged)
-     */
-    QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+     * dequeue from store (only done once messages is acknowledged).  Dequeue
+     * -may- complete asynchronously. This method returns 'false' if the
+     * dequeue has completed, else 'true' if the dequeue is asynchronous.  If
+     * the caller is interested in receiving notification when the asynchronous
+     * dequeue completes, it may pass a pointer to a factory functor that
+     * returns a shareable DequeueDoneCallback object.  If the dequeue is
+     * completed synchronously, this pointer is ignored.  If the dequeue will
+     * complete asynchronously, the factory is called to obtain a
+     * DequeueDoneCallback.  When the dequeue completes, the
+     * DequeueDoneCallback is invoked. The callback should be prepared to
+     * execute on any thread.
+     */
+    class DequeueDoneCallback
+    {
+    public:
+        virtual void operator()() = 0;
+    };
+    typedef boost::function<boost::shared_ptr<DequeueDoneCallback>()> DequeueDoneCallbackFactory;
+    QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
+                                    DequeueDoneCallbackFactory *factory = 0);
+
+    /** invoked by store to signal dequeue() has completed */
+    QPID_BROKER_EXTERN void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg);
+
     /**
      * Inform the queue that a previous transactional dequeue
      * committed.
@@ -382,6 +403,9 @@ class Queue : public boost::enable_share
     void flush();
 
     const Broker* getBroker();
+
+ private:
+    std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> > pendingDequeueCallbacks;
 };
 }
 }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun  1 20:22:37 2011
@@ -761,7 +761,122 @@ isInSequenceSetAnd(const SequenceSet& s,
     return IsInSequenceSetAnd<Predicate>(s,p);
 }
 
+/** Design notes for asychronous completion of Message.accept **
+ * Message.Accept command cannot be considered "complete" until all messages
+ * identified by the sequence set that accompanys the command have been
+ * completely dequeued.  A message dequeue may not be able to complet
+ * synchronously - specifically when the message is durable.  Therefore, the
+ * Message.Accept command handling must be able to track asynchronous dequeues,
+ * and notify the Session when all dequeues are done (at which point the
+ * command completes).  See QPID-3079.
+ */
+namespace {
+
+    /** Manage a Message.accept that requires async completion of one or more
+     * message dequeue operations */
+    class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext
+    {
+        mutable qpid::sys::Mutex lock;
+        std::set<DeliveryId> pending;    // for dequeue to complete
+        bool ready;
+        SemanticState& state;
+
+    public:
+        AsyncMessageAcceptCmd(SemanticState& _state)
+            : ready(false), state(_state) {}
+
+        /** called from session to urge pending dequeues to complete ASAP */
+        void flush()
+        {
+            std::set<DeliveryId> copy;
+            {
+                Mutex::ScopedLock l(lock);
+                copy = pending;
+            }
+            for (std::set<DeliveryId>::iterator i = copy.begin();
+                 i != copy.end(); ++i) {
+                for (DeliveryRecords::iterator r = state.getUnacked().begin();
+                     r != state.getUnacked().end(); ++r) {
+                    if (r->getId() == *i) {
+                        r->getQueue()->flush();
+                        break;
+                    }
+                }
+            }
+        }
+
+        /** add a pending dequeue to track */
+        void add( const DeliveryId& id )
+        {
+            Mutex::ScopedLock l(lock);
+            bool unique = pending.insert(id).second;
+            if (!unique) {
+                assert(false);
+            }
+        }
+
+        /** signal this dequeue done. Note: may be run in *any* thread */
+        void complete( const DeliveryId& id )
+        {
+            Mutex::ScopedLock l(lock);
+            std::set<DeliveryId>::iterator i = pending.find(id);
+            assert(i != pending.end());
+            pending.erase(i);
+
+            if (ready && pending.empty()) {
+                framing::Invoker::Result r;
+                Mutex::ScopedUnlock ul(lock);
+                completed( r );
+            }
+        }
+
+        /** allow the Message.Accept to complete */
+        void enable()
+        {
+            Mutex::ScopedLock l(lock);
+            if (pending.empty()) {
+                framing::Invoker::Result r;
+                Mutex::ScopedUnlock ul(lock);
+                completed( r );
+                return;
+            }
+            ready = true;
+        }
+    };
+
+
+    /** callback to indicate a single message has completed its dequeue.  This
+        object is made available to the queue, which will invoke it when the
+        dequeue completes. */
+    class DequeueDone : public Queue::DequeueDoneCallback
+    {
+        DeliveryId id;
+        boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
+    public:
+        DequeueDone( const DeliveryId & _id,
+                     boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
+            : id(_id), cmd(_cmd) {}
+        void operator()() { cmd->complete( id ); }
+    };
+
+
+    /** factory to create the above callback - passed to queue's dequeue
+        method, only called if dequeue is async! */
+    boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
+                                                           const DeliveryId& id,
+                                                           boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd )
+    {
+        if (!cmd) {     // first async dequeue creates the context
+            cmd.reset(new AsyncMessageAcceptCmd(*state));
+        }
+        cmd->add( id );
+        boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
+        return x;
+    }
+}
+
 void SemanticState::accepted(const SequenceSet& commands) {
+    QPID_LOG(error, "SemanticState::accepted (" << commands << ")");
     assertClusterSafe();
     if (txBuffer.get()) {
         //in transactional mode, don't dequeue or remove, just
@@ -785,12 +900,32 @@ void SemanticState::accepted(const Seque
             unacked.erase(removed, unacked.end());
         }
     } else {
-        DeliveryRecords::iterator removed =
-            remove_if(unacked.begin(), unacked.end(),
-                      isInSequenceSetAnd(commands,
-                                         bind(&DeliveryRecord::accept, _1,
-                                              (TransactionContext*) 0)));
-        unacked.erase(removed, unacked.end());
+        /** @todo KAG - the following code removes the command from unacked
+            even if the dequeue has not completed.  note that the command will
+            still not complete until all dequeues complete. I'm doing this to
+            avoid having to lock the unacked list, which would be necessary if
+            we remove when the dequeue completes. Is this ok? */
+        boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
+        DeliveryRecords::iterator i;
+        DeliveryRecords undone;
+        for (i = unacked.begin(); i < unacked.end(); ++i) {
+            if (i->coveredBy(&commands)) {
+                Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd);
+                if (i->accept((TransactionContext*) 0, &f) == false) {
+                    undone.push_back(*i);
+                }
+            }
+        }
+        if (undone.empty())
+            unacked.clear();
+        else
+            unacked.swap(undone);
+
+        if (cmd) {
+            boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));
+            session.registerAsyncCommand(pcmd);
+            cmd->enable();
+        }
     }
 }
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h Wed Jun  1 20:22:37 2011
@@ -25,6 +25,7 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/framing/Invoker.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/OwnershipToken.h"
@@ -37,7 +38,12 @@ namespace broker {
 
 class SessionContext : public OwnershipToken, public sys::OutputControl
 {
-  public:
+ protected:
+    class AsyncCommandManager;
+
+ public:
+    class AsyncCommandContext;
+
     virtual ~SessionContext(){}
     virtual bool isLocal(const ConnectionToken* t) const = 0;
     virtual bool isAttached() const = 0;
@@ -47,7 +53,52 @@ class SessionContext : public OwnershipT
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
     virtual void addPendingExecutionSync() = 0;
-};
+    // pass async command context to Session, completion must not occur
+    // until -after- this call returns.
+    virtual void registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>&) = 0;
+
+    // class for commands that need to complete asynchronously
+    friend class AsyncCommandContext;
+    class AsyncCommandContext : virtual public RefCounted
+    {
+     private:
+        framing::SequenceNumber id;
+        bool requiresAccept;
+        bool syncBitSet;
+        boost::intrusive_ptr<SessionContext::AsyncCommandManager> manager;
+
+     public:
+        AsyncCommandContext() : id(0), requiresAccept(false), syncBitSet(false) {}
+        virtual ~AsyncCommandContext() {}
+
+        framing::SequenceNumber getId() { return id; }
+        void setId(const framing::SequenceNumber seq) { id = seq; }
+        bool getRequiresAccept() { return requiresAccept; }
+        void setRequiresAccept(const bool a) { requiresAccept = a; }
+        bool getSyncBitSet() { return syncBitSet; }
+        void setSyncBitSet(const bool s) { syncBitSet = s; }
+        void setManager(SessionContext::AsyncCommandManager *m) { manager.reset(m); }
+
+        /** notify session that this command has completed */
+        void completed(const framing::Invoker::Result& r)
+        {
+            boost::intrusive_ptr<AsyncCommandContext> context(this);
+            manager->completePendingCommand( context, r );
+            manager.reset(0);
+        }
+
+        // to force completion as fast as possible (like when Sync arrives)
+        virtual void flush() = 0;
+    };
+
+ protected:
+    class AsyncCommandManager : public RefCounted
+    {
+     public:
+        virtual void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
+                                            const framing::Invoker::Result&) = 0;
+    };
+ };
 
 }} // namespace qpid::broker
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Jun  1 20:22:37 2011
@@ -62,7 +62,7 @@ SessionState::SessionState(
       msgBuilder(&broker.getStore()),
       mgmtObject(0),
       rateFlowcontrol(0),
-      asyncCommandCompleter(new AsyncCommandCompleter(this))
+      asyncCommandManager(new AsyncCommandManager(this))
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -95,7 +95,7 @@ void SessionState::addManagementObject()
 }
 
 SessionState::~SessionState() {
-    asyncCommandCompleter->cancel();
+    asyncCommandManager->cancel();
     semanticState.closed();
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -126,7 +126,7 @@ bool SessionState::isLocal(const Connect
 
 void SessionState::detach() {
     QPID_LOG(debug, getId() << ": detached on broker.");
-    asyncCommandCompleter->detached();
+    asyncCommandManager->detached();
     disableOutput();
     handler = 0;
     if (mgmtObject != 0)
@@ -147,7 +147,7 @@ void SessionState::attach(SessionHandler
         mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
         mgmtObject->set_channelId (h.getChannel());
     }
-    asyncCommandCompleter->attached();
+    asyncCommandManager->attached();
 }
 
 void SessionState::abort() {
@@ -204,22 +204,22 @@ Manageable::status_t SessionState::Manag
     return status;
 }
 
-void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id)
+{
     currentCommandComplete = true;      // assumed, can be overridden by invoker method (this sucks).
+    syncCurrentCommand = method->isSync();
+    acceptRequired = false;
     Invoker::Result invocation = invoke(adapter, *method);
-    if (currentCommandComplete) receiverCompleted(id);
-
     if (!invocation.wasHandled()) {
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
-    } else if (invocation.hasResult()) {
-        getProxy().getExecution().result(id, invocation.getResult());
     }
 
-    if (method->isSync() && currentCommandComplete) {
-        sendAcceptAndCompletion();
+    if (currentCommandComplete) {
+        completeCommand(id, invocation, false, syncCurrentCommand);
     }
 }
 
+
 struct ScheduledCreditTask : public sys::TimerTask {
     sys::Timer& timer;
     SessionState& sessionState;
@@ -260,6 +260,9 @@ void SessionState::handleContent(AMQFram
         }
         msg->setPublisher(&getConnection());
         msg->getIngressCompletion().begin();
+        currentCommandComplete = true;      // assumed
+        syncCurrentCommand = msg->getFrames().getMethod()->isSync();
+        acceptRequired = msg->requiresAccept();
         semanticState.handle(msg);
         msgBuilder.end();
         IncompleteIngressMsgXfer xfer(this, msg);
@@ -313,17 +316,19 @@ void SessionState::sendAcceptAndCompleti
     sendCompletion();
 }
 
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc).  At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
- */
-void SessionState::completeRcvMsg(SequenceNumber id,
-                                  bool requiresAccept,
-                                  bool requiresSync)
+/** Complete a received command */
+void SessionState::completeCommand(const SequenceNumber& id,
+                                   const framing::Invoker::Result& results,
+                                   bool requiresAccept,
+                                   bool syncBitSet)
 {
     bool callSendCompletion = false;
     receiverCompleted(id);
+
+    if (results.hasResult()) {
+        getProxy().getExecution().result(id, results.getResult());
+    }
+
     if (requiresAccept)
         // will cause msg's seq to appear in the next message.accept we send.
         accepted.add(id);
@@ -340,7 +345,7 @@ void SessionState::completeRcvMsg(Sequen
     }
 
     // if the sender has requested immediate notification of the completion...
-    if (requiresSync) {
+    if (syncBitSet) {
         sendAcceptAndCompletion();
     } else if (callSendCompletion) {
         sendCompletion();
@@ -427,12 +432,25 @@ void SessionState::addPendingExecutionSy
     if (receiverGetIncomplete().front() < syncCommandId) {
         currentCommandComplete = false;
         pendingExecutionSyncs.push(syncCommandId);
-        asyncCommandCompleter->flushPendingMessages();
+        asyncCommandManager->flushPendingCommands();
         QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
     }
 }
 
 
+void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
+{
+    /** @todo KAG: ensure this is invoked during handleCommand() context! */
+    currentCommandComplete = false;
+    asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand );
+}
+
+
+void SessionState::cancelAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
+{
+    asyncCommandManager->cancelPendingCommand(aCmd);
+}
+
 /** factory for creating a reference-counted IncompleteIngressMsgXfer object
  * which will be attached to a message that will be completed asynchronously.
  */
@@ -441,15 +459,14 @@ SessionState::IncompleteIngressMsgXfer::
 {
     boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
 
-    // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
-    // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
-    if (requiresSync)
+    // this routine is *only* invoked when the message needs to be asynchronously completed.  Otherwise, ::completed()
+    // will be invoked directly.
+    pending = true;
+    boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb));
+    session->registerAsyncCommand(ctxt);
+    if (ctxt->getSyncBitSet()) {
+        // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
         msg->flush();
-    else {
-        // otherwise, we need to track this message in order to flush it if an execution.sync arrives
-        // before it has been completed (see flushPendingMessages())
-        pending = true;
-        completerContext->addPendingMessage(msg);
     }
     return cb;
 }
@@ -461,110 +478,129 @@ SessionState::IncompleteIngressMsgXfer::
  */
 void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
-    if (pending) completerContext->deletePendingMessage(id);
     if (!sync) {
         /** note well: this path may execute in any thread.  It is safe to access
          * the scheduledCompleterContext, since *this has a shared pointer to it.
          * but not session!
          */
         session = 0;
-        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
-        completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId());
+        completed(framing::Invoker::Result());
     } else {
         // this path runs directly from the ac->end() call in handleContent() above,
         // so *session is definately valid.
         if (session->isAttached()) {
-            QPID_LOG(debug, ": receive completed for msg seq=" << id);
-            session->completeRcvMsg(id, requiresAccept, requiresSync);
+            QPID_LOG(debug, ": receive completed for msg seq=" << getId());
+            session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet());
+        }
+        if (pending) {
+            boost::intrusive_ptr<AsyncCommandContext> p(this);
+            session->cancelAsyncCommand(p);
         }
     }
-    completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
+}
+
+
+void SessionState::IncompleteIngressMsgXfer::flush()
+{
+    msg->flush();
 }
 
 
 /** Scheduled from an asynchronous command's completed callback to run on
  * the IO thread.
  */
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
+void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncCommandManager> ctxt)
 {
-    ctxt->completeCommands();
+    ctxt->processCompletedCommands();
 }
 
 
-/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+                                                          framing::SequenceNumber seq,
+                                                          bool acceptRequired, bool syncBitSet)
 {
+    cmd->setId(seq);
+    cmd->setRequiresAccept(acceptRequired);
+    cmd->setSyncBitSet(syncBitSet);
+    cmd->setManager(this);
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
-    bool unique = pendingMsgs.insert(item).second;
-    assert(unique);
+    std::pair<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > item(cmd->getId(), cmd);
+    bool unique = pendingCommands.insert(item).second;
+    if (!unique) assert(false);
 }
 
 
-/** pending message has completed */
-void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    pendingMsgs.erase(id);
+    pendingCommands.erase(cmd->getId());
+    cmd->setManager(0);
 }
 
 
+
 /** done when an execution.sync arrives */
-void SessionState::AsyncCommandCompleter::flushPendingMessages()
+void SessionState::AsyncCommandManager::flushPendingCommands()
 {
-    std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+    std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > copy;
     {
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-        pendingMsgs.swap(copy);    // we've only tracked these in case a flush is needed, so nuke 'em now.
+        copy = pendingCommands;
     }
     // drop lock, so it is safe to call "flush()"
-    for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+    for (std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> >::iterator i = copy.begin();
          i != copy.end(); ++i) {
         i->second->flush();
     }
 }
 
 
-/** mark an ingress Message.Transfer command as completed.
+/** mark a pending command as completed.
  * This method must be thread safe - it may run on any thread.
  */
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
-                                                                bool requiresAccept,
-                                                                bool requiresSync)
+void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+                                                               const framing::Invoker::Result& result)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
     if (session && isAttached) {
-        MessageInfo msg(cmd, requiresAccept, requiresSync);
-        completedMsgs.push_back(msg);
-        if (completedMsgs.size() == 1) {
+        CommandInfo status(cmd->getId(),
+                           result,
+                           cmd->getRequiresAccept(),
+                           cmd->getSyncBitSet());
+        completedCommands.push_back(status);
+        if (completedCommands.size() == 1) {
             session->getConnection().requestIOProcessing(boost::bind(&schedule,
-                                                                     session->asyncCommandCompleter));
+                                                                     session->asyncCommandManager));
         }
     }
+    pendingCommands.erase(cmd->getId());
 }
 
 
 /** Cause the session to complete all completed commands.
  * Executes on the IO thread.
  */
-void SessionState::AsyncCommandCompleter::completeCommands()
+void SessionState::AsyncCommandManager::processCompletedCommands()
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
 
     // when session is destroyed, it clears the session pointer via cancel().
     if (session && session->isAttached()) {
-        for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
-             msg != completedMsgs.end(); ++msg) {
-            session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+        for (std::vector<CommandInfo>::iterator cmd = completedCommands.begin();
+             cmd != completedCommands.end(); ++cmd) {
+            session->completeCommand(cmd->id,
+                                     cmd->results,
+                                     cmd->requiresAccept,
+                                     cmd->syncBitSet);
         }
     }
-    completedMsgs.clear();
+    completedCommands.clear();
 }
 
 
 /** cancel any pending calls to scheduleComplete */
-void SessionState::AsyncCommandCompleter::cancel()
+void SessionState::AsyncCommandManager::cancel()
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
     session = 0;
@@ -573,7 +609,7 @@ void SessionState::AsyncCommandCompleter
 
 /** inform the completer that the session has attached,
  * allows command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::attached()
+void SessionState::AsyncCommandManager::attached()
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
     isAttached = true;
@@ -582,7 +618,7 @@ void SessionState::AsyncCommandCompleter
 
 /** inform the completer that the session has detached,
  * disables command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::detached()
+void SessionState::AsyncCommandManager::detached()
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
     isAttached = false;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h Wed Jun  1 20:22:37 2011
@@ -25,6 +25,7 @@
 #include "qpid/SessionState.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/ServerInvoker.h"
 #include "qpid/sys/Time.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Session.h"
@@ -133,8 +134,17 @@ class SessionState : public qpid::Sessio
     // belonging to inter-broker bridges
     void addManagementObject();
 
+    // allows commands (dispatched via handleCommand()) to inform the session
+    // that they may complete asynchronously.
+    void registerAsyncCommand(boost::intrusive_ptr<SessionContext::AsyncCommandContext>&);
+    void cancelAsyncCommand(boost::intrusive_ptr<SessionContext::AsyncCommandContext>&);
+    
   private:
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
+    /** finish command processing started in handleCommand() */
+    void completeCommand(const framing::SequenceNumber&,
+                         const framing::Invoker::Result&, bool requiresAccept,
+                         bool syncBitSet);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
 
     // indicate that the given ingress msg has been completely received by the
@@ -173,99 +183,100 @@ class SessionState : public qpid::Sessio
     boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
 
-    // sequence numbers for pending received Execution.Sync commands
+    // sequence numbers of received Execution.Sync commands that are pending completion.
     std::queue<SequenceNumber> pendingExecutionSyncs;
+
+    // true if command completes during call to handleCommand()
     bool currentCommandComplete;
+    bool syncCurrentCommand;
+    bool acceptRequired;
 
+ protected:
     /** This class provides a context for completing asynchronous commands in a thread
      * safe manner.  Asynchronous commands save their completion state in this class.
-     * This class then schedules the completeCommands() method in the IO thread.
-     * While running in the IO thread, completeCommands() may safely complete all
+     * This class then schedules the processCompletedCommands() method in the IO thread.
+     * While running in the IO thread, processCompletedCommands() may safely complete all
      * saved commands without the risk of colliding with other operations on this
      * SessionState.
      */
-    class AsyncCommandCompleter : public RefCounted {
+    class AsyncCommandManager : public SessionContext::AsyncCommandManager {
     private:
         SessionState *session;
         bool isAttached;
         qpid::sys::Mutex completerLock;
 
-        // special-case message.transfer commands for optimization
-        struct MessageInfo {
-            SequenceNumber cmd; // message.transfer command id
-            bool requiresAccept;
-            bool requiresSync;
-        MessageInfo(SequenceNumber c, bool a, bool s)
-        : cmd(c), requiresAccept(a), requiresSync(s) {}
+        /** all commands pending completion */
+        std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > pendingCommands;
+
+        // Store information about completed commands that are pending the
+        // call to completeCommands()
+        struct CommandInfo {
+            SequenceNumber id;
+            framing::Invoker::Result results;
+            bool requiresAccept;    // only if cmd==Message.transfer
+            bool syncBitSet;
+        CommandInfo(SequenceNumber c, const framing::Invoker::Result& r, bool a, bool s)
+        : id(c), results(r), requiresAccept(a), syncBitSet(s) {}
         };
-        std::vector<MessageInfo> completedMsgs;
-        // If an ingress message does not require a Sync, we need to
-        // hold a reference to it in case an Execution.Sync command is received and we
-        // have to manually flush the message.
-        std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+        std::vector<CommandInfo> completedCommands;
+
+        /** finish processing all completed commands, runs in IO thread */
+        void processCompletedCommands();
 
-        /** complete all pending commands, runs in IO thread */
-        void completeCommands();
+        /** for scheduling a run of "processCompletedCommands()" on the IO thread */
+        static void schedule(boost::intrusive_ptr<AsyncCommandManager>);
 
-        /** for scheduling a run of "completeCommands()" on the IO thread */
-        static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
 
     public:
-        AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
-        ~AsyncCommandCompleter() {};
+        AsyncCommandManager(SessionState *s) : session(s), isAttached(s->isAttached()) {};
+        ~AsyncCommandManager() {};
 
         /** track a message pending ingress completion */
-        void addPendingMessage(boost::intrusive_ptr<Message> m);
-        void deletePendingMessage(SequenceNumber id);
-        void flushPendingMessages();
+        //void addPendingMessage(boost::intrusive_ptr<Message> m);
+        //void deletePendingMessage(SequenceNumber id);
+        //void flushPendingMessages();
         /** schedule the processing of a completed ingress message.transfer command */
-        void scheduleMsgCompletion(SequenceNumber cmd,
-                                   bool requiresAccept,
-                                   bool requiresSync);
+        //void scheduleMsgCompletion(SequenceNumber cmd,
+        //                           bool requiresAccept,
+        //                           bool requiresSync);
         void cancel();  // called by SessionState destructor.
         void attached();  // called by SessionState on attach()
         void detached();  // called by SessionState on detach()
-    };
-    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
 
-    /** Abstract class that represents a single asynchronous command that is
-     * pending completion.
-     */
-    class AsyncCommandContext : public AsyncCompletion::Callback
-    {
-     public:
-        AsyncCommandContext( SessionState *ss, SequenceNumber _id )
-          : id(_id), completerContext(ss->asyncCommandCompleter) {}
-        virtual ~AsyncCommandContext() {}
-
-     protected:
-        SequenceNumber id;
-        boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
+        /** called by async command handlers */
+        void addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
+                               framing::SequenceNumber, bool, bool);
+        void cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&);
+        void flushPendingCommands();
+        void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&);
     };
+    boost::intrusive_ptr<AsyncCommandManager> asyncCommandManager;
+
+ private:
 
     /** incomplete Message.transfer commands - inbound to broker from client
      */
-    class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
+    class IncompleteIngressMsgXfer : public AsyncCommandContext,
+                                     public AsyncCompletion::Callback
     {
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
                                   boost::intrusive_ptr<Message> m )
-          : AsyncCommandContext(ss, m->getCommandId()),
-          session(ss),
+          : session(ss),
           msg(m),
-          requiresAccept(m->requiresAccept()),
-          requiresSync(m->getFrames().getMethod()->isSync()),
           pending(false) {}
         virtual ~IncompleteIngressMsgXfer() {};
 
+        // async completion calls
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
+        // async cmd calls
+        virtual void flush();
+
      private:
         SessionState *session;  // only valid if sync flag in callback is true
         boost::intrusive_ptr<Message> msg;
-        bool requiresAccept;
-        bool requiresSync;
         bool pending;   // true if msg saved on pending list...
     };
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/tests/QueueTest.cpp?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/tests/QueueTest.cpp Wed Jun  1 20:22:37 2011
@@ -366,7 +366,7 @@ class TestMessageStoreOC : public Messag
 
     virtual void dequeue(TransactionContext*,
                  const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
-                 const PersistableQueue& /*queue*/)
+                 const boost::shared_ptr<PersistableQueue>& /*queue*/)
     {
         if (error) throw Exception("Dequeue error test");
         deqCnt++;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/tests/TestMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/tests/TestMessageStore.h?rev=1130291&r1=1130290&r2=1130291&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/tests/TestMessageStore.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/tests/TestMessageStore.h Wed Jun  1 20:22:37 2011
@@ -41,7 +41,7 @@ class TestMessageStore : public NullMess
 
     void dequeue(TransactionContext*,
                  const boost::intrusive_ptr<PersistableMessage>& msg,
-                 const PersistableQueue& /*queue*/)
+                 const boost::shared_ptr<PersistableQueue>& /*queue*/)
     {
         dequeued.push_back(msg);
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message