Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 58947 invoked from network); 1 Oct 2007 10:25:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 1 Oct 2007 10:25:28 -0000 Received: (qmail 7240 invoked by uid 500); 1 Oct 2007 10:25:18 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 7229 invoked by uid 500); 1 Oct 2007 10:25:18 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 7220 invoked by uid 99); 1 Oct 2007 10:25:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2007 03:25:18 -0700 X-ASF-Spam-Status: No, hits=-98.8 required=10.0 tests=ALL_TRUSTED,DNS_FROM_DOB,RCVD_IN_DOB X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2007 10:25:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C19301A9832; Mon, 1 Oct 2007 03:24:37 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r580915 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/client: CompletionTracker.h Execution.h ExecutionHandler.cpp ExecutionHandler.h SessionCore.h Date: Mon, 01 Oct 2007 10:24:33 -0000 To: qpid-commits@incubator.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071001102437.C19301A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Mon Oct 1 03:24:25 2007 New Revision: 580915 URL: http://svn.apache.org/viewvc?rev=580915&view=rev Log: Make ExecutionHandler threadsafe for calls that can be made by application threads. Added generic listener for completion changes. Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=580915&r1=580914&r2=580915&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Mon Oct 1 03:24:25 2007 @@ -34,8 +34,7 @@ class CompletionTracker { public: - //typedef boost::function CompletionListener; - typedef boost::function0 CompletionListener; + typedef boost::function CompletionListener; typedef boost::function ResultListener; CompletionTracker(); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=580915&r1=580914&r2=580915&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Mon Oct 1 03:24:25 2007 @@ -37,6 +37,7 @@ virtual Demux& getDemux() = 0; virtual bool isComplete(const framing::SequenceNumber& id) = 0; virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0; + virtual void setCompletionListener(boost::function) = 0; }; }} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580915&r1=580914&r2=580915&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Oct 1 03:24:25 2007 @@ -29,6 +29,7 @@ using namespace qpid::client; using namespace qpid::framing; using namespace boost; +using qpid::sys::Mutex; bool isMessageMethod(AMQMethodBody* method) { @@ -81,9 +82,12 @@ throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { SequenceNumber mark(cumulative); - outgoingCompletionStatus.update(mark, range); + { + Mutex::ScopedLock l(lock); + outgoingCompletionStatus.update(mark, range); + } + if (completionListener) completionListener(); completion.completed(outgoingCompletionStatus.mark); - //TODO: signal listeners of early notification? } } @@ -111,6 +115,7 @@ void ExecutionHandler::flushTo(const framing::SequenceNumber& point) { + Mutex::ScopedLock l(lock); if (point > outgoingCompletionStatus.mark) { sendFlushRequest(); } @@ -118,12 +123,14 @@ void ExecutionHandler::sendFlushRequest() { + Mutex::ScopedLock l(lock); AMQFrame frame(0, ExecutionFlushBody()); out(frame); } void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { + Mutex::ScopedLock l(lock); if (point > outgoingCompletionStatus.mark) { sendSyncRequest(); } @@ -132,17 +139,21 @@ void ExecutionHandler::sendSyncRequest() { + Mutex::ScopedLock l(lock); AMQFrame frame(0, ExecutionSyncBody()); out(frame); } void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) { - if (id > incomingCompletionStatus.mark) { - if (cumulative) { - incomingCompletionStatus.update(incomingCompletionStatus.mark, id); - } else { - incomingCompletionStatus.update(id, id); + { + Mutex::ScopedLock l(lock); + if (id > incomingCompletionStatus.mark) { + if (cumulative) { + incomingCompletionStatus.update(incomingCompletionStatus.mark, id); + } else { + incomingCompletionStatus.update(id, id); + } } } if (send) { @@ -153,15 +164,17 @@ void ExecutionHandler::sendCompletion() { + Mutex::ScopedLock l(lock); SequenceNumberSet range; incomingCompletionStatus.collectRanges(range); AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range)); out(frame); } -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) { - return send(command, l, false); + Mutex::ScopedLock l(lock); + return send(command, listener, false); } SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) @@ -179,9 +192,10 @@ } SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, - CompletionTracker::ResultListener l) + CompletionTracker::ResultListener listener) { - SequenceNumber id = send(command, l, true); + Mutex::ScopedLock l(lock); + SequenceNumber id = send(command, listener, true); sendContent(content); return id; } @@ -227,10 +241,17 @@ bool ExecutionHandler::isComplete(const SequenceNumber& id) { + Mutex::ScopedLock l(lock); return outgoingCompletionStatus.covers(id); } bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) { + Mutex::ScopedLock l(lock); return outgoingCompletionStatus.mark >= id; +} + +void ExecutionHandler::setCompletionListener(boost::function l) +{ + completionListener = l; } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580915&r1=580914&r2=580915&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Oct 1 03:24:25 2007 @@ -27,6 +27,7 @@ #include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Mutex.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" @@ -49,8 +50,10 @@ Correlator correlation; CompletionTracker completion; Demux demux; + sys::Mutex lock; framing::ProtocolVersion version; uint64_t maxFrameSize; + boost::function completionListener; void complete(uint32_t mark, const framing::SequenceNumberSet& range); void flush(); @@ -90,6 +93,8 @@ Correlator& getCorrelator() { return correlation; } CompletionTracker& getCompletionTracker() { return completion; } Demux& getDemux() { return demux; } + + void setCompletionListener(boost::function); }; }} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=580915&r1=580914&r2=580915&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Mon Oct 1 03:24:25 2007 @@ -56,7 +56,7 @@ SessionHandler l2; ExecutionHandler l3; framing::Uuid uuid; - bool sync; + volatile bool sync; Reason reason; protected: