qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r585085 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/Queue.cpp broker/SemanticState.cpp sys/posix/AsynchIO.cpp
Date Tue, 16 Oct 2007 09:12:01 GMT
Author: gsim
Date: Tue Oct 16 02:11:48 2007
New Revision: 585085

URL: http://svn.apache.org/viewvc?rev=585085&view=rev
Log:
* Revised allocation algorithm to ensure all consumers are given the opportunity to consume
a message
* If already have infinit credit, don't try to add to it
* If get disconnected while processing close, just finish off the close and don't signal the
disconnection


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 16 02:11:48 2007
@@ -155,12 +155,14 @@
 bool Queue::dispatch(QueuedMessage& msg)
 {
     Consumer* c = allocate();
-    int start = next;
+    Consumer* first = c;
     while(c){
         if(c->deliver(msg)) {
             return true;            
+        } else {
+            c = allocate();
+            if (c == first) c = 0;
         }
-        c = next == start ? 0 : allocate();            
     }
     return false;
 }
@@ -170,7 +172,10 @@
      while(true){
         {
 	    Mutex::ScopedLock locker(messageLock);
-	    if (messages.empty()) break; 
+	    if (messages.empty()) { 
+                QPID_LOG(debug, "No messages to dispatch on queue '" << name <<
"'");
+                break; 
+            }
 	    msg = messages.front();
 	}
         if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 16 02:11:48 2007
@@ -267,7 +267,8 @@
 {
     Mutex::ScopedLock l(lock);
     if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit()))
{
-        QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " <<
byteCredit << " msgs: " << msgCredit);
+        QPID_LOG(debug, "Not enough credit for '" << name  << "' on " <<
parent 
+                 << ", bytes: " << byteCredit << " msgs: " << msgCredit);
         return false;
     } else {
         uint32_t originalMsgCredit = msgCredit;
@@ -279,8 +280,8 @@
         if (byteCredit != 0xFFFFFFFF) {
             byteCredit -= msg->getRequiredCredit();
         }
-        QPID_LOG(debug, "Credit available for '" << name 
-                 << "', was " << " bytes: " << originalByteCredit <<
" msgs: " << originalMsgCredit
+        QPID_LOG(debug, "Credit available for '" << name << "' on " <<
parent
+                 << ", was " << " bytes: " << originalByteCredit <<
" msgs: " << originalMsgCredit
                  << " now bytes: " << byteCredit << " msgs: " <<
msgCredit);
         return true;
     }
@@ -519,7 +520,9 @@
 {
     {
         Mutex::ScopedLock l(lock);
-        byteCredit += value;
+        if (byteCredit != 0xFFFFFFFF) {
+            byteCredit += value;
+        }
     }
     requestDispatch();
 }
@@ -528,7 +531,9 @@
 {
     {
         Mutex::ScopedLock l(lock);
-        msgCredit += value;
+        if (msgCredit != 0xFFFFFFFF) {
+            msgCredit += value;
+        }
     }
     requestDispatch();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Oct 16 02:11:48 2007
@@ -292,12 +292,10 @@
 }
         
 void AsynchIO::disconnected(DispatchHandle& h) {
-	// If we've already queued close do it before callback
-	if (queuedClose) {
-		close(h);
-	}
-	
-    if (disCallback) {
+    // If we've already queued close do it instead of disconnected callback
+    if (queuedClose) {
+        close(h);
+    } else if (disCallback) {
         disCallback(*this);
         h.unwatch();
     }



Mime
View raw message