qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1062965 - in /qpid/branches/qpid-2935/qpid/cpp/src: ./ qpid/broker/ tests/
Date Mon, 24 Jan 2011 20:45:56 GMT
Author: kgiusti
Date: Mon Jan 24 20:45:55 2011
New Revision: 1062965

URL: http://svn.apache.org/viewvc?rev=1062965&view=rev
Log:
QPID-2921: (partial) Async completion of message.transfer and execution.sync commands

Added:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp

Modified: qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am Mon Jan 24 20:45:55 2011
@@ -548,8 +548,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/HandlerImpl.h \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/HeadersExchange.h \
-  qpid/broker/IncompleteMessageList.cpp \
-  qpid/broker/IncompleteMessageList.h \
+  qpid/broker/AsyncCompletion.h \
   qpid/broker/Link.cpp \
   qpid/broker/Link.h \
   qpid/broker/LinkRegistry.cpp \

Added: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1062965&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h Mon Jan 24 20:45:55
2011
@@ -0,0 +1,171 @@
+#ifndef _Completion_
+#define _Completion_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+    namespace broker {
+
+        /**
+         * Class to implement asynchronous notification of completion.
+         *
+         * Use-case: An "initiator" needs to wait for a set of "completers" to
+         * finish a unit of work before an action can occur.  This object
+         * tracks the progress of the set of completers, and allows the action
+         * to occur once all completers have signalled that they are done.
+         *
+         * The initiator and completers may be running in separate threads.
+         *
+         * The initiating thread is the thread that initiates the action,
+         * i.e. the connection read thread.
+         *
+         * A completing thread is any thread that contributes to completion,
+         * e.g. a store thread that does an async write.
+         * There may be zero or more completers.
+         *
+         * When the work is complete, a callback is invoked.  The callback
+         * may be invoked in the Initiator thread, or one of the Completer
+         * threads. The callback is passed a flag indicating whether or not
+         * the callback is running under the context of the Initiator thread.
+         *
+         * Use model:
+         * 1) Initiator thread invokes begin()
+         * 2) After begin() has been invoked, zero or more Completers invoke
+         * startCompleter().  Completers may be running in the same or
+         * different thread as the Initiator, as long as they guarantee that
+         * startCompleter() is invoked at least once before the Initiator invokes end().
+         * 3) Completers may invoke finishCompleter() at any time, even after the
+         * initiator has invoked end().  finishCompleter() may be called from any
+         * thread.
+         * 4) startCompleter()/finishCompleter() calls "nest": for each call to
+         * startCompleter(), a corresponding call to finishCompleter() must be made.
+         * Once the last finishCompleter() is called, the Completer must no longer
+         * reference the completion object.
+         * 5) The Initiator invokes end() at the point where it has finished
+         * dispatching work to the Completers, and is prepared for the callback
+         * handler to be invoked. Note: if there are no outstanding Completers
+         * pending when the Initiator invokes end(), the callback will be invoked
+         * directly, and the sync parameter will be set true. This indicates to the
+         * Initiator that the callback is executing in the context of the end() call,
+         * and the Initiator is free to optimize the handling of the completion,
+         * assuming no need for synchronization with Completer threads.
+         */
+        class AsyncCompletion {
+      public:
+            // encapsulates the completion callback handler
+            class CompletionHandler {
+          public:
+                virtual void operator() (bool) { /* bool == true if called via end() */}
+            };
+
+      private:
+            mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
+            mutable qpid::sys::Monitor callbackLock;
+            bool inCallback;
+            void invokeCallback(bool sync) {
+                qpid::sys::Mutex::ScopedLock l(callbackLock);
+                inCallback = true;
+                if (handler) {
+                    qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+                    (*handler)(sync);
+                    handler.reset();
+                }
+                inCallback = false;
+                callbackLock.notifyAll();
+            }
+
+      protected:
+            /** Invoked when all completers have signalled that they have completed
+             * (via calls to finishCompleter()).
+             */
+            boost::shared_ptr<CompletionHandler> handler;
+
+      public:
+      AsyncCompletion() : completionsNeeded(0), inCallback(false) {};
+            virtual ~AsyncCompletion() { /* @todo KAG - assert(completionsNeeded.get() ==
0); */ };
+
+            /** True when all outstanding operations have compeleted
+             */
+            bool isDone()
+            {
+                qpid::sys::Mutex::ScopedLock l(callbackLock);
+                return !inCallback && completionsNeeded.get() == 0;
+            }
+
+            /** Called to signal the start of an asynchronous operation.  The operation
+             * is considered pending until finishCompleter() is called.
+             * E.g. called when initiating an async store operation.
+             */
+            void startCompleter() { ++completionsNeeded; }
+
+            /** Called by completer to signal that it has finished the operation started
+             * when startCompleter() was invoked.
+             * e.g. called when async write complete.
+             */
+            void finishCompleter()
+            {
+                if (--completionsNeeded == 0) {
+                    invokeCallback(false);
+                }
+            }
+
+            /** called by initiator before any calls to startCompleter can be done.
+             */
+            void begin() { startCompleter(); };
+
+            /** called by initiator after all potential completers have called
+             * startCompleter().
+             */
+            //void end(CompletionHandler::shared_ptr& _handler)
+            void end(boost::shared_ptr<CompletionHandler> _handler)
+            {
+                assert(completionsNeeded.get() > 0);    // ensure begin() has been called!
+                handler = _handler;
+                if (--completionsNeeded == 0) {
+                    invokeCallback(true);
+                }
+            }
+
+            /** may be called by Initiator to cancel the callback registered by end()
+             */
+            void cancel() {
+                qpid::sys::Mutex::ScopedLock l(callbackLock);
+                while (inCallback) callbackLock.wait();
+                handler.reset();
+            }
+
+            /** may be called by Initiator after all completers have been added but
+             * prior to calling end().  Allows initiator to determine if it _really_
+             * needs to wait for pending Completers (e.g. count > 1).
+             */
+            uint32_t getPendingCompleters() { return completionsNeeded.get(); }
+        };
+
+    }}  // qpid::broker::
+#endif  /*!_Completion_*/

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp Mon Jan 24 20:45:55 2011
@@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER;
 Message::Message(const framing::SequenceNumber& id) :
     frames(id), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
-    inCallback(false), requiredCredit(0) {}
+    expiration(FAR_FUTURE), dequeueCallback(0),
+    inCallback(false), requiredCredit(0)
+{}
 
 Message::Message(const Message& original) :
     PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false),
loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
-    inCallback(false), requiredCredit(0) 
+    expiration(original.expiration), dequeueCallback(0),
+    inCallback(false), requiredCredit(0)
 {
     setExpiryPolicy(original.expiryPolicy);
 }
@@ -431,30 +432,12 @@ struct ScopedSet {
 };
 }
 
-void Message::allEnqueuesComplete() {
-    ScopedSet ss(callbackLock, inCallback);
-    MessageCallback* cb = enqueueCallback;
-    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
 void Message::allDequeuesComplete() {
     ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    enqueueCallback = &cb;
-}
-
-void Message::resetEnqueueCompleteCallback() {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    enqueueCallback = 0;
-}
-
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
     while (inCallback) callbackLock.wait();

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h Mon Jan 24 20:45:55 2011
@@ -156,10 +156,6 @@ public:
     boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
     void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
 
-    /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
-    void setEnqueueCompleteCallback(MessageCallback& cb);
-    void resetEnqueueCompleteCallback();
-
     /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
     void resetDequeueCompleteCallback();
@@ -170,7 +166,6 @@ public:
     typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
 
     MessageAdapter& getAdapter() const;
-    void allEnqueuesComplete();
     void allDequeuesComplete();
 
     mutable sys::Mutex lock;
@@ -192,7 +187,6 @@ public:
     mutable boost::intrusive_ptr<Message> empty;
 
     sys::Monitor callbackLock;
-    MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
     bool inCallback;
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Mon Jan 24 20:45:55
2011
@@ -34,7 +34,6 @@ class MessageStore;
 PersistableMessage::~PersistableMessage() {}
 
 PersistableMessage::PersistableMessage() :
-    asyncEnqueueCounter(0), 
     asyncDequeueCounter(0),
     store(0)
 {}
@@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleas
     return contentReleaseState.released;
 }
        
-bool PersistableMessage::isEnqueueComplete() {
-    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-    return asyncEnqueueCounter == 0;
-}
-
-void PersistableMessage::enqueueComplete() {
-    bool notify = false;
-    {
-        sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-        if (asyncEnqueueCounter > 0) {
-            if (--asyncEnqueueCounter == 0) {
-                notify = true;
-            }
-        }
-    }
-    if (notify) 
-        allEnqueuesComplete();
-}
 
 bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
     if (store && (queue->getPersistenceId()!=0)) {
@@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(P
 
 void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store)
{ 
     addToSyncList(queue, _store);
-    enqueueAsync();
-}
-
-void PersistableMessage::enqueueAsync() { 
-    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-    asyncEnqueueCounter++; 
+    enqueueStart();
 }
 
 bool PersistableMessage::isDequeueComplete() { 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Jan 24 20:45:55
2011
@@ -31,6 +31,7 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/AsyncCompletion.h"
 
 namespace qpid {
 namespace broker {
@@ -43,18 +44,18 @@ class MessageStore;
 class PersistableMessage : public Persistable
 {
     typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Mutex asyncEnqueueLock;
     sys::Mutex asyncDequeueLock;
     sys::Mutex storeLock;
-       
+
     /**
-     * Tracks the number of outstanding asynchronous enqueue
-     * operations. When the message is enqueued asynchronously the
-     * count is incremented; when that enqueue completes it is
-     * decremented. Thus when it is 0, there are no outstanding
-     * enqueues.
+     * Tracks the number of outstanding asynchronous operations that must
+     * complete before the message can be considered safely received by the
+     * broker.  E.g. all enqueues have completed, the message has been written
+     * to store, credit has been replenished, etc. Once all outstanding
+     * operations have completed, the transfer of this message from the client
+     * may be considered complete.
      */
-    int asyncEnqueueCounter;
+    AsyncCompletion receiveCompletion;
 
     /**
      * Tracks the number of outstanding asynchronous dequeue
@@ -65,7 +66,6 @@ class PersistableMessage : public Persis
      */
     int asyncDequeueCounter;
 
-    void enqueueAsync();
     void dequeueAsync();
 
     syncList synclist;
@@ -80,8 +80,6 @@ class PersistableMessage : public Persis
     ContentReleaseState contentReleaseState;
 
   protected:
-    /** Called when all enqueues are complete for this message. */
-    virtual void allEnqueuesComplete() = 0;
     /** Called when all dequeues are complete for this message. */
     virtual void allDequeuesComplete() = 0;
 
@@ -115,9 +113,9 @@ class PersistableMessage : public Persis
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
-    QPID_BROKER_EXTERN bool isEnqueueComplete();
-
-    QPID_BROKER_EXTERN void enqueueComplete();
+    QPID_BROKER_EXTERN bool isReceiveComplete() { return receiveCompletion.isDone(); }
+    QPID_BROKER_EXTERN void enqueueStart() { receiveCompletion.startCompleter(); }
+    QPID_BROKER_EXTERN void enqueueComplete() { receiveCompletion.finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
@@ -133,7 +131,8 @@ class PersistableMessage : public Persis
     bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
     
     void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-    
+
+    QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return receiveCompletion;
}
 };
 
 }}

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jan 24 20:45:55 2011
@@ -157,13 +157,8 @@ void Queue::deliver(boost::intrusive_ptr
         //drop message
         QPID_LOG(info, "Dropping excluded message from " << getName());
     } else {
-        // if no store then mark as enqueued
-        if (!enqueue(0, msg)){
-            push(msg);
-            msg->enqueueComplete();
-        }else {
-            push(msg);
-        }
+        enqueue(0, msg);
+        push(msg);
         mgntEnqStats(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
@@ -688,7 +683,7 @@ uint32_t Queue::getEnqueueCompleteMessag
         //NOTE: don't need to use checkLvqReplace() here as it
         //is only relevant for LVQ which does not support persistence
         //so the enqueueComplete check has no effect
-        if ( i->payload->isEnqueueComplete() ) count ++;
+        if ( i->payload->isReceiveComplete() ) count ++;
     }
     
     return count;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Jan 24 20:45:55
2011
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/broker/SessionState.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -586,7 +587,12 @@ framing::MessageResumeResult SessionAdap
     
 
 
-void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+    session.addPendingExecutionSync();
+    /** @todo KAG - need a generic mechanism to allow a command to returning "not completed"
status back to SessionState */
+
+}
 
 void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/,
const string& /*value*/)
 {

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h Mon Jan 24 20:45:55
2011
@@ -46,6 +46,7 @@ class SessionContext : public OwnershipT
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
+    virtual void addPendingExecutionSync() = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Jan 24 20:45:55
2011
@@ -59,7 +59,6 @@ SessionState::SessionState(
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore()),
-      enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
       mgmtObject(0),
       rateFlowcontrol(0)
 {
@@ -94,6 +93,18 @@ SessionState::~SessionState() {
 
     if (flowControlTimer)
         flowControlTimer->cancel();
+
+    // clean up any outstanding incomplete receive messages
+
+    qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+    while (!incompleteRcvMsgs.empty()) {
+        boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front());
+        incompleteRcvMsgs.pop_front();
+        {
+            qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
+            ref->cancel();
+        }
+    }
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -195,15 +206,17 @@ Manageable::status_t SessionState::Manag
 }
 
 void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber&
id) {
+    currentCommandComplete = true;      // assumed, can be overridden by invoker method (this
sucks).
     Invoker::Result invocation = invoke(adapter, *method);
-    receiverCompleted(id);
+    if (currentCommandComplete) receiverCompleted(id);
+
     if (!invocation.wasHandled()) {
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
     } else if (invocation.hasResult()) {
         getProxy().getExecution().result(id, invocation.getResult());
     }
-    if (method->isSync()) {
-        incomplete.process(enqueuedOp, true);
+
+    if (method->isSync() && currentCommandComplete) {
         sendAcceptAndCompletion();
     }
 }
@@ -247,21 +260,24 @@ void SessionState::handleContent(AMQFram
             msg->getFrames().append(header);
         }
         msg->setPublisher(&getConnection());
+
+        msg->getReceiveCompletion().begin();
         semanticState.handle(msg);
         msgBuilder.end();
-
-        if (msg->isEnqueueComplete()) {
-            enqueued(msg);
-        } else {
-            incomplete.add(msg);
-        }
-
-        //hold up execution until async enqueue is complete
-        if (msg->getFrames().getMethod()->isSync()) {
-            incomplete.process(enqueuedOp, true);
-            sendAcceptAndCompletion();
+        if (msg->getReceiveCompletion().getPendingCompleters() == 1) {
+            // There are no other pending receive completers (just this SessionState).
+            // Mark the message as completed.
+            completeRcvMsg( msg );
         } else {
-            incomplete.process(enqueuedOp, false);
+            // There are outstanding receive completers.  Save the message until
+            // they are all done.
+            QPID_LOG(debug, getId() << ": delaying completion of msg seq=" <<
msg->getCommandId());
+            boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this,
msg));
+            {
+                qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+                incompleteRcvMsgs.push_back(pendingMsg);
+            }
+            msg->getReceiveCompletion().end( pendingMsg );   // allows others to complete
         }
     }
 
@@ -312,11 +328,36 @@ void SessionState::sendAcceptAndCompleti
     sendCompletion();
 }
 
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc).  At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg)
 {
+    bool callSendCompletion = false;
     receiverCompleted(msg->getCommandId());
     if (msg->requiresAccept())
+        // will cause msg's seq to appear in the next message.accept we send.
         accepted.add(msg->getCommandId());
+
+    // Are there any outstanding Execution.Sync commands pending the
+    // completion of this msg?  If so, complete them.
+    while (!pendingExecutionSyncs.empty() &&
+           receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+        const SequenceNumber& id = pendingExecutionSyncs.front();
+        pendingExecutionSyncs.pop();
+        QPID_LOG(debug, getId() << ": delayed execution.sync " << id <<
" is completed.");
+        receiverCompleted(id);
+        callSendCompletion = true;   // likely peer is pending for this completion.
+    }
+
+    // if the sender has requested immediate notification of the completion...
+    if (msg->getFrames().getMethod()->isSync()) {
+        sendAcceptAndCompletion();
+    } else if (callSendCompletion) {
+        sendCompletion();
+    }
 }
 
 void SessionState::handleIn(AMQFrame& frame) {
@@ -389,4 +430,80 @@ framing::AMQP_ClientProxy& SessionState:
     return handler->getClusterOrderProxy();
 }
 
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+    SequenceNumber syncCommandId = receiverGetCurrent();
+    if (receiverGetIncomplete().front() < syncCommandId) {
+        currentCommandComplete = false;
+        pendingExecutionSyncs.push(syncCommandId);
+        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " <<
syncCommandId);
+    }
+}
+
+
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion.  May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
+ */
+void SessionState::IncompleteRcvMsg::operator() (bool sync)
+{
+    QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId()
<< " sync=" << sync);
+    boost::shared_ptr<IncompleteRcvMsg> tmp;
+    {
+        qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
+        for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin();
+             i != session->incompleteRcvMsgs.end(); ++i) {
+            if (i->get() == this) {
+                tmp.swap(*i);
+                session->incompleteRcvMsgs.remove(*i);
+                break;
+            }
+        }
+    }
+
+    if (session->isAttached()) {
+        if (sync) {
+            QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
+            session->completeRcvMsg(msg);
+        } else {    // potentially called from a different thread
+            QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
+            session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter,
tmp));
+        }
+    }
+}
+
+
+/** Scheduled from IncompleteRcvMsg callback, completes the message receive
+ * asynchronously
+ */
+void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg>
iMsg)
+{
+    QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
+    if (iMsg->session && iMsg->session->isAttached()) {
+        QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg
seq=" << iMsg->msg->getCommandId());
+        iMsg->session->completeRcvMsg(iMsg->msg);
+    }
+}
+
+
+/** Cancels a pending incomplete receive message completion callback.  Note
+ * well: will wait for the callback to finish if it is currently in progress
+ * on another thread.
+ */
+void SessionState::IncompleteRcvMsg::cancel()
+{
+    QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for
msg seq=" << msg->getCommandId());
+    // Cancel the message complete callback.  On return, we are guaranteed there
+    // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync)
+    msg->getReceiveCompletion().cancel();
+    // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending,
+    // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg.
+    session = 0;
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Mon Jan 24 20:45:55 2011
@@ -30,10 +30,11 @@
 #include "qmf/org/apache/qpid/broker/Session.h"
 #include "qpid/broker/SessionAdapter.h"
 #include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -122,11 +123,15 @@ class SessionState : public qpid::Sessio
 
     const SessionId& getSessionId() const { return getId(); }
 
+    // Used by ExecutionHandler sync command processing.  Notifies
+    // the SessionState of a received Execution.Sync command.
+    void addPendingExecutionSync();
+
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber&
id);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-    void enqueued(boost::intrusive_ptr<Message> msg);
+    void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
@@ -152,8 +157,6 @@ class SessionState : public qpid::Sessio
     SemanticState semanticState;
     SessionAdapter adapter;
     MessageBuilder msgBuilder;
-    IncompleteMessageList incomplete;
-    IncompleteMessageList::CompletionListener enqueuedOp;
     qmf::org::apache::qpid::broker::Session* mgmtObject;
     qpid::framing::SequenceSet accepted;
 
@@ -162,7 +165,28 @@ class SessionState : public qpid::Sessio
     boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
 
+    // sequence numbers for pending received Execution.Sync commands
+    std::queue<SequenceNumber> pendingExecutionSyncs;
+    bool currentCommandComplete;
+
+    class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
+    {
+  public:
+        IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message>
_msg)
+          : session(&_session), msg(_msg) {}
+        virtual void operator() (bool sync);
+        void cancel();   // cancel pending incomplete callback [operator() above].
+
+  private:
+        SessionState *session;
+        boost::intrusive_ptr<Message> msg;
+        static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg
);
+    };
+    std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs;
+    qpid::sys::Mutex incompleteRcvMsgsLock;
+
     friend class SessionManager;
+    friend class IncompleteRcvMsg;
 };
 
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am Mon Jan 24 20:45:55 2011
@@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	InlineVector.cpp \
 	SequenceSet.cpp \
 	StringUtils.cpp \
-	IncompleteMessageList.cpp \
 	RangeSet.cpp \
 	AtomicValue.cpp \
 	QueueTest.cpp \

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp Mon Jan 24 20:45:55 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
     BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
-    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete());
+    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete());
 }
 
 QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit)
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
     intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
 
-    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
+    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete());
     BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
 
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message