qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r604983 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: framing/FrameSet.cpp sys/posix/AsynchIO.cpp sys/posix/Time.cpp
Date Mon, 17 Dec 2007 20:08:46 GMT
Author: astitcher
Date: Mon Dec 17 12:08:46 2007
New Revision: 604983

URL: http://svn.apache.org/viewvc?rev=604983&view=rev
Log:
* Limited time allowed for reading on a single connection in a single go to 2ms
* Limit bytes allowed to be written on a connection on a single go to the max ever read
* Small performance fix for appending to FrameSets

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=604983&r1=604982&r2=604983&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Mon Dec 17 12:08:46 2007
@@ -28,7 +28,7 @@
 using namespace qpid::framing;
 using namespace boost;
 
-FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {}
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {parts.reserve(4);}
 
 void FrameSet::append(AMQFrame& part)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=604983&r1=604982&r2=604983&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Mon Dec 17 12:08:46 2007
@@ -20,9 +20,12 @@
  */
 
 #include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Time.h"
 
 #include "check.h"
 
+// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
+// could (should) be promoted to be platform portable
 #include <unistd.h>
 #include <sys/socket.h>
 #include <signal.h>
@@ -42,6 +45,18 @@
 	::signal(SIGPIPE, SIG_IGN);
 }
 
+/*
+ * We keep per thread state to avoid locking overhead. The assumption is that
+ * on average all the connections are serviced by all the threads so the state
+ * recorded in each thread is about the same. If this turns out not to be the
+ * case we could rebalance the info occasionally.  
+ */
+__thread int threadReadTotal = 0;
+__thread int threadMaxRead = 0;
+__thread int threadReadCount = 0;
+__thread int threadWriteTotal = 0;
+__thread int threadWriteCount = 0;
+__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
 }
 
 /*
@@ -182,6 +197,8 @@
  * it in
  */
 void AsynchIO::readable(DispatchHandle& h) {
+    int readTotal = 0;
+    AbsTime readStartTime = AbsTime::now();
     do {
         // (Try to) get a buffer
         if (!bufferQueue.empty()) {
@@ -193,11 +210,20 @@
             int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
             if (rc > 0) {
                 buff->dataCount += rc;
+                threadReadTotal += rc;
+                readTotal += rc;
+
                 readCallback(*this, buff);
                 if (rc != readCount) {
                     // If we didn't fill the read buffer then time to stop reading
-                    return;
+                    break;
+                }
+                
+                // Stop reading if we've overrun our timeslot
+                if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) {
+                    break;
                 }
+                
             } else {
                 // Put buffer back (at front so it doesn't interfere with unread buffers)
                 bufferQueue.push_front(buff);
@@ -206,11 +232,11 @@
                 if (rc == 0 || errno == ECONNRESET) {
                     eofCallback(*this);
                     h.unwatchRead();
-                    return;
+                    break;
                 } else if (errno == EAGAIN) {
                     // We have just put a buffer back so we know
                     // we can carry on watching for reads
-                    return;
+                    break;
                 } else {
                     QPID_POSIX_CHECK(rc);
                 }
@@ -223,17 +249,22 @@
             // If we still have no buffers we can't do anything more
             if (bufferQueue.empty()) {
                 h.unwatchRead();
-                return;
+                break;
             }
             
         }
     } while (true);
+
+    ++threadReadCount;
+    threadMaxRead = std::max(threadMaxRead, readTotal);
+    return;
 }
 
 /*
  * We carry on writing whilst we have data to write and we can write
  */
 void AsynchIO::writeable(DispatchHandle& h) {
+    int writeTotal = 0;
     do {
         // See if we've got something to write
         if (!writeQueue.empty()) {
@@ -244,16 +275,24 @@
             assert(buff->dataStart+buff->dataCount <= buff->byteCount);
             int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
             if (rc >= 0) {
+                threadWriteTotal += rc;
+                writeTotal += rc;
+
                 // If we didn't write full buffer put rest back
                 if (rc != buff->dataCount) {
                     buff->dataStart += rc;
                     buff->dataCount -= rc;
                     writeQueue.push_back(buff);
-                    return;
+                    break;
                 }
                 
                 // Recycle the buffer
                 queueReadBuffer(buff);
+                
+                // If we've already written more than the max for reading then stop
+                // (this is to stop writes dominating reads) 
+                if (writeTotal > threadMaxRead)
+                    break;
             } else {
                 // Put buffer back
                 writeQueue.push_back(buff);
@@ -261,11 +300,11 @@
                     // Just stop watching for write here - we'll get a
                     // disconnect callback soon enough
                     h.unwatchWrite();
-                    return;
+                    break;
                 } else if (errno == EAGAIN) {
                     // We have just put a buffer back so we know
                     // we can carry on watching for writes
-                    return;
+                    break;
                 } else {
                     QPID_POSIX_CHECK(rc);
                 }
@@ -274,7 +313,7 @@
             // If we're waiting to close the socket then can do it now as there is nothing
to write
             if (queuedClose) {
                 close(h);
-                return;
+                break;
             }
             // Fd is writable, but nothing to write
             if (idleCallback) {
@@ -284,15 +323,19 @@
             // If we still have no buffers to write we can't do anything more
             if (writeQueue.empty() && !writePending && !queuedClose) {
                 h.unwatchWrite();
-                //the following handles the case where writePending is
-                //set to true after the test above; in this case its
-                //possible that the unwatchWrite overwrites the
-                //desired rewatchWrite so we correct that here
-                if (writePending) h.rewatchWrite();
-                return;
+                // The following handles the case where writePending is
+                // set to true after the test above; in this case its
+                // possible that the unwatchWrite overwrites the
+                // desired rewatchWrite so we correct that here
+                if (writePending)
+                    h.rewatchWrite();
+                break;
             }
         }
     } while (true);
+
+    ++threadWriteCount;
+    return;
 }
         
 void AsynchIO::disconnected(DispatchHandle& h) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp?rev=604983&r1=604982&r2=604983&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp Mon Dec 17 12:08:46 2007
@@ -31,7 +31,7 @@
 
 AbsTime AbsTime::now() {
     struct timespec ts;
-    clock_gettime(CLOCK_REALTIME, &ts);
+    ::clock_gettime(CLOCK_REALTIME, &ts);
     AbsTime time_now;
     time_now.time_ns = toTime(ts).nanosecs;
     return time_now;



Mime
View raw message