qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1101864 - in /qpid/trunk/qpid/cpp/src/qpid/broker: SessionState.cpp SessionState.h
Date Wed, 11 May 2011 13:02:33 GMT
Author: kgiusti
Date: Wed May 11 13:02:32 2011
New Revision: 1101864

URL: http://svn.apache.org/viewvc?rev=1101864&view=rev
Log:
QPID-3252: flush msgs when sync requested.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1101864&r1=1101863&r2=1101864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed May 11 13:02:32 2011
@@ -95,14 +95,13 @@ void SessionState::addManagementObject()
 }
 
 SessionState::~SessionState() {
+    asyncCommandCompleter->cancel();
     semanticState.closed();
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
 
     if (flowControlTimer)
         flowControlTimer->cancel();
-
-    asyncCommandCompleter->cancel();
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -428,6 +427,7 @@ void SessionState::addPendingExecutionSy
     if (receiverGetIncomplete().front() < syncCommandId) {
         currentCommandComplete = false;
         pendingExecutionSyncs.push(syncCommandId);
+        asyncCommandCompleter->flushPendingMessages();
         QPID_LOG(debug, getId() << ": delaying completion of execution.sync " <<
syncCommandId);
     }
 }
@@ -440,6 +440,17 @@ boost::intrusive_ptr<AsyncCompletion::Ca
 SessionState::IncompleteIngressMsgXfer::clone()
 {
     boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session,
msg));
+
+    // Optimization: this routine is *only* invoked when the message needs to be asynchronously
completed.
+    // If the client is pending the message.transfer completion, flush now to force immediate
write to journal.
+    if (requiresSync)
+        msg->flush();
+    else {
+        // otherwise, we need to track this message in order to flush it if an execution.sync
arrives
+        // before it has been completed (see flushPendingMessages())
+        pending = true;
+        completerContext->addPendingMessage(msg);
+    }
     return cb;
 }
 
@@ -450,17 +461,18 @@ SessionState::IncompleteIngressMsgXfer::
  */
 void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
+    if (pending) completerContext->deletePendingMessage(id);
     if (!sync) {
         /** note well: this path may execute in any thread.  It is safe to access
          * the scheduledCompleterContext, since *this has a shared pointer to it.
-         * but not session or msg!
+         * but not session!
          */
-        session = 0; msg = 0;
+        session = 0;
         QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
         completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
     } else {
         // this path runs directly from the ac->end() call in handleContent() above,
-        // so *session and *msg are definately valid.
+        // so *session is definately valid.
         if (session->isAttached()) {
             QPID_LOG(debug, ": receive completed for msg seq=" << id);
             session->completeRcvMsg(id, requiresAccept, requiresSync);
@@ -479,6 +491,40 @@ void SessionState::AsyncCommandCompleter
 }
 
 
+/** Track an ingress message that is pending completion */
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message>
msg)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(),
msg);
+    bool unique = pendingMsgs.insert(item).second;
+    assert(unique);
+}
+
+
+/** pending message has completed */
+void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    pendingMsgs.erase(id);
+}
+
+
+/** done when an execution.sync arrives */
+void SessionState::AsyncCommandCompleter::flushPendingMessages()
+{
+    std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+        pendingMsgs.swap(copy);    // we've only tracked these in case a flush is needed,
so nuke 'em now.
+    }
+    // drop lock, so it is safe to call "flush()"
+    for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i
= copy.begin();
+         i != copy.end(); ++i) {
+        i->second->flush();
+    }
+}
+
+
 /** mark an ingress Message.Transfer command as completed.
  * This method must be thread safe - it may run on any thread.
  */

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1101864&r1=1101863&r2=1101864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed May 11 13:02:32 2011
@@ -199,6 +199,10 @@ class SessionState : public qpid::Sessio
         : cmd(c), requiresAccept(a), requiresSync(s) {}
         };
         std::vector<MessageInfo> completedMsgs;
+        // If an ingress message does not require a Sync, we need to
+        // hold a reference to it in case an Execution.Sync command is received and we
+        // have to manually flush the message.
+        std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
 
         /** complete all pending commands, runs in IO thread */
         void completeCommands();
@@ -210,7 +214,11 @@ class SessionState : public qpid::Sessio
         AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached())
{};
         ~AsyncCommandCompleter() {};
 
-        /** schedule the completion of an ingress message.transfer command */
+        /** track a message pending ingress completion */
+        void addPendingMessage(boost::intrusive_ptr<Message> m);
+        void deletePendingMessage(SequenceNumber id);
+        void flushPendingMessages();
+        /** schedule the processing of a completed ingress message.transfer command */
         void scheduleMsgCompletion(SequenceNumber cmd,
                                    bool requiresAccept,
                                    bool requiresSync);
@@ -243,20 +251,22 @@ class SessionState : public qpid::Sessio
         IncompleteIngressMsgXfer( SessionState *ss,
                                   boost::intrusive_ptr<Message> m )
           : AsyncCommandContext(ss, m->getCommandId()),
-            session(ss),
-            msg(m.get()),
-            requiresAccept(msg->requiresAccept()),
-            requiresSync(msg->getFrames().getMethod()->isSync()) {};
+          session(ss),
+          msg(m),
+          requiresAccept(m->requiresAccept()),
+          requiresSync(m->getFrames().getMethod()->isSync()),
+          pending(false) {}
         virtual ~IncompleteIngressMsgXfer() {};
 
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
      private:
-        SessionState *session;  // only valid if sync == true
-        Message *msg;           // only valid if sync == true
+        SessionState *session;  // only valid if sync flag in callback is true
+        boost::intrusive_ptr<Message> msg;
         bool requiresAccept;
         bool requiresSync;
+        bool pending;   // true if msg saved on pending list...
     };
 
     friend class SessionManager;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message