qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r727455 - in /qpid/trunk/qpid/cpp/src/qpid/amqp_0_10: Connection.cpp Connection.h
Date Wed, 17 Dec 2008 18:05:30 GMT
Author: aconway
Date: Wed Dec 17 10:05:30 2008
New Revision: 727455

URL: http://svn.apache.org/viewvc?rev=727455&view=rev
Log:
src/qpid/amqp_0_10/Connection.cpp: allow encoding to be concurrent with adding new frames.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=727455&r1=727454&r2=727455&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Wed Dec 17 10:05:30 2008
@@ -69,7 +69,11 @@
 }
 
 size_t  Connection::encode(const char* buffer, size_t size) {
-    Mutex::ScopedLock l(frameQueueLock);
+    {   // Swap frameQueue data into workQueue to avoid holding lock while we encode.
+        Mutex::ScopedLock l(frameQueueLock);
+        assert(workQueue.empty());
+        workQueue.swap(frameQueue); 
+    }
     framing::Buffer out(const_cast<char*>(buffer), size);
     if (!isClient && !initialized) {
         framing::ProtocolInitiation pi(getVersion());
@@ -78,16 +82,24 @@
         QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi <<
")");
     }
     size_t frameSize=0;
-    while (!frameQueue.empty() && ((frameSize=frameQueue.front().encodedSize()) <=
out.available())) {
-        frameQueue.front().encode(out);
-        QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
-        frameQueue.pop_front();
-        buffered -= frameSize;
-        if (frameQueue.empty() && out.available() > 0) connection->doOutput();

+    size_t encoded=0;
+    while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize()) <=
out.available())) {
+        workQueue.front().encode(out);
+        QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front());
+        workQueue.pop_front();
+        encoded += frameSize;
+        if (workQueue.empty() && out.available() > 0) connection->doOutput();

+    }
+    assert(workQueue.empty() || workQueue.front().encodedSize() <= size);
+    if (!workQueue.empty() && workQueue.front().encodedSize() > size)
+        throw InternalErrorException(QPID_MSG("Frame too large for buffer."));
+    {
+        Mutex::ScopedLock l(frameQueueLock);
+        buffered -= encoded;
+        // Put back any frames we did not encode.
+        frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end());
+        workQueue.clear();
     }
-    assert(frameQueue.empty() || frameQueue.front().encodedSize() <= size);
-    if (!frameQueue.empty() && frameQueue.front().encodedSize() > size)
-        throw InternalErrorException(QPID_MSG("Could not write frame, too large for buffer."));
     return out.getPosition();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=727455&r1=727454&r2=727455&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Wed Dec 17 10:05:30 2008
@@ -45,6 +45,7 @@
     typedef std::deque<framing::AMQFrame> FrameQueue;
 
     FrameQueue frameQueue;
+    FrameQueue workQueue;
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;



Mime
View raw message