qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r580915 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/client: CompletionTracker.h Execution.h ExecutionHandler.cpp ExecutionHandler.h SessionCore.h
Date Mon, 01 Oct 2007 10:24:33 GMT
Author: gsim
Date: Mon Oct  1 03:24:25 2007
New Revision: 580915

URL: http://svn.apache.org/viewvc?rev=580915&view=rev
Log:
Make ExecutionHandler threadsafe for calls that can be made by application threads.
Added generic listener for completion changes.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Mon Oct  1 03:24:25
2007
@@ -34,8 +34,7 @@
 class CompletionTracker
 {
 public:
-    //typedef boost::function<void()> CompletionListener;    
-    typedef boost::function0<void> CompletionListener;    
+    typedef boost::function<void()> CompletionListener;    
     typedef boost::function<void(const std::string&)> ResultListener;
 
     CompletionTracker();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Mon Oct  1 03:24:25 2007
@@ -37,6 +37,7 @@
     virtual Demux& getDemux() = 0;
     virtual bool isComplete(const framing::SequenceNumber& id) = 0;
     virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
+    virtual void setCompletionListener(boost::function<void()>) = 0;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Oct  1 03:24:25
2007
@@ -29,6 +29,7 @@
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace boost;
+using qpid::sys::Mutex;
 
 bool isMessageMethod(AMQMethodBody* method)
 {
@@ -81,9 +82,12 @@
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
         SequenceNumber mark(cumulative);        
-        outgoingCompletionStatus.update(mark, range);
+        {
+            Mutex::ScopedLock l(lock);
+            outgoingCompletionStatus.update(mark, range);
+        }
+        if (completionListener) completionListener();
         completion.completed(outgoingCompletionStatus.mark);
-
         //TODO: signal listeners of early notification?         
     }
 }
@@ -111,6 +115,7 @@
 
 void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
 {
+    Mutex::ScopedLock l(lock);
     if (point > outgoingCompletionStatus.mark) {
         sendFlushRequest();
     }        
@@ -118,12 +123,14 @@
 
 void ExecutionHandler::sendFlushRequest()
 {
+    Mutex::ScopedLock l(lock);
     AMQFrame frame(0, ExecutionFlushBody());
     out(frame);
 }
 
 void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
 {
+    Mutex::ScopedLock l(lock);
     if (point > outgoingCompletionStatus.mark) {
         sendSyncRequest();
     }        
@@ -132,17 +139,21 @@
 
 void ExecutionHandler::sendSyncRequest()
 {
+    Mutex::ScopedLock l(lock);
     AMQFrame frame(0, ExecutionSyncBody());
     out(frame);
 }
 
 void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
 {
-    if (id > incomingCompletionStatus.mark) {
-        if (cumulative) {
-            incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
-        } else {
-            incomingCompletionStatus.update(id, id);            
+    {
+        Mutex::ScopedLock l(lock);
+        if (id > incomingCompletionStatus.mark) {
+            if (cumulative) {
+                incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
+            } else {
+                incomingCompletionStatus.update(id, id);            
+            }
         }
     }
     if (send) {
@@ -153,15 +164,17 @@
 
 void ExecutionHandler::sendCompletion()
 {
+    Mutex::ScopedLock l(lock);
     SequenceNumberSet range;
     incomingCompletionStatus.collectRanges(range);
     AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(),
range));
     out(frame);    
 }
 
-SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener
l)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener
listener)
 {
-    return send(command, l, false);
+    Mutex::ScopedLock l(lock);
+    return send(command, listener, false);
 }
 
 SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener
l, bool hasContent)
@@ -179,9 +192,10 @@
 }
 
 SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent&
content, 
-                                      CompletionTracker::ResultListener l)
+                                      CompletionTracker::ResultListener listener)
 {
-    SequenceNumber id = send(command, l, true);
+    Mutex::ScopedLock l(lock);
+    SequenceNumber id = send(command, listener, true);
     sendContent(content);
     return id;
 }
@@ -227,10 +241,17 @@
 
 bool ExecutionHandler::isComplete(const SequenceNumber& id)
 {
+    Mutex::ScopedLock l(lock);
     return outgoingCompletionStatus.covers(id);
 }
 
 bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
 {
+    Mutex::ScopedLock l(lock);
     return outgoingCompletionStatus.mark >= id;
+}
+
+void ExecutionHandler::setCompletionListener(boost::function<void()> l)
+{
+    completionListener = l;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Oct  1 03:24:25 2007
@@ -27,6 +27,7 @@
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MethodContent.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
 #include "ChainableFrameHandler.h"
 #include "CompletionTracker.h"
 #include "Correlator.h"
@@ -49,8 +50,10 @@
     Correlator correlation;
     CompletionTracker completion;
     Demux demux;
+    sys::Mutex lock;
     framing::ProtocolVersion version;
     uint64_t maxFrameSize;
+    boost::function<void()> completionListener;
 
     void complete(uint32_t mark, const framing::SequenceNumberSet& range);    
     void flush();
@@ -90,6 +93,8 @@
     Correlator& getCorrelator() { return correlation; }
     CompletionTracker& getCompletionTracker() { return completion; }
     Demux& getDemux() { return demux; }
+
+    void setCompletionListener(boost::function<void()>);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Mon Oct  1 03:24:25 2007
@@ -56,7 +56,7 @@
     SessionHandler l2;
     ExecutionHandler l3;
     framing::Uuid uuid;
-    bool sync;
+    volatile bool sync;
     Reason reason;
 
   protected:



Mime
View raw message