qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1074332 - in /qpid/trunk/qpid/cpp/src/qpid: broker/Connection.cpp broker/Connection.h cluster/Connection.cpp cluster/Connection.h cluster/OutputInterceptor.cpp
Date Thu, 24 Feb 2011 22:35:53 GMT
Author: kgiusti
Date: Thu Feb 24 22:35:53 2011
New Revision: 1074332

URL: http://svn.apache.org/viewvc?rev=1074332&view=rev
Log:
QPID-3084: apply Alan's fix to allow io callbacks to run during a cluster update.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1074332&r1=1074331&r2=1074332&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Feb 24 22:35:53 2011
@@ -345,17 +345,21 @@ void Connection::closed(){ // Physically
     }
 }
 
+void Connection::doIoCallbacks() {
+    {
+        ScopedLock<Mutex> l(ioCallbackLock);
+        while (!ioCallbacks.empty()) {
+            boost::function0<void> cb = ioCallbacks.front();
+            ioCallbacks.pop();
+            ScopedUnlock<Mutex> ul(ioCallbackLock);
+            cb(); // Lend the IO thread for management processing
+        }
+    }
+}
+
 bool Connection::doOutput() {
     try {
-        {
-            ScopedLock<Mutex> l(ioCallbackLock);
-            while (!ioCallbacks.empty()) {
-                boost::function0<void> cb = ioCallbacks.front();
-                ioCallbacks.pop();
-                ScopedUnlock<Mutex> ul(ioCallbackLock);
-                cb(); // Lend the IO thread for management processing
-            }
-        }
+        doIoCallbacks();
         if (mgmtClosing) {
             closed();
             close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
@@ -475,8 +479,8 @@ void Connection::OutboundFrameTracker::a
 void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
 void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit);
}
 void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
-{ 
-    next->send(f); 
+{
+    next->send(f);
     con.sent(f);
 }
 void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1074332&r1=1074331&r2=1074332&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Feb 24 22:35:53 2011
@@ -153,13 +153,16 @@ class Connection : public sys::Connectio
     void addManagementObject();
 
     const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
-    { 
+    {
         return securitySettings;
     }
 
     /** @return true if the initial connection negotiation is complete. */
     bool isOpen();
 
+    // Used by cluster during catch-up, see cluster::OutputInterceptor
+    void doIoCallbacks();
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
@@ -201,7 +204,7 @@ class Connection : public sys::Connectio
         sys::ConnectionOutputHandler* next;
     };
     OutboundFrameTracker outboundTracker;
-    
+
 
     void sent(const framing::AMQFrame& f);
   public:

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1074332&r1=1074331&r2=1074332&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Feb 24 22:35:53 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -143,7 +143,7 @@ void Connection::init() {
 // Called when we have consumed a read buffer to give credit to the
 // connection layer to continue reading.
 void Connection::giveReadCredit(int credit) {
-    if (cluster.getSettings().readMax && credit) 
+    if (cluster.getSettings().readMax && credit)
         output.giveReadCredit(credit);
 }
 
@@ -201,7 +201,7 @@ void Connection::received(framing::AMQFr
     }
     else {             // Shadow or updated catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>())
{
-            if (isShadow()) 
+            if (isShadow())
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection->getOutput().send(ok);
@@ -241,7 +241,7 @@ void Connection::deliverDoOutput(uint32_
 void Connection::deliveredFrame(const EventFrame& f) {
     GiveReadCreditOnExit gc(*this, f.readCredit);
     assert(!catchUp);
-    currentChannel = f.frame.getChannel(); 
+    currentChannel = f.frame.getChannel();
     if (f.frame.getBody()       // frame can be emtpy with just readCredit
         && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection
contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
@@ -287,7 +287,7 @@ void Connection::deliverClose () {
     cluster.erase(self);
 }
 
-// Close the connection 
+// Close the connection
 void Connection::close() {
     if (connection.get()) {
         QPID_LOG(debug, cluster << " closed connection " << *this);
@@ -332,9 +332,9 @@ size_t Connection::decode(const char* da
         if (!checkProtocolHeader(ptr, size)) // Updates ptr
             return 0; // Incomplete header
 
-        if (!connection->isOpen()) 
+        if (!connection->isOpen())
             processInitialFrames(ptr, end-ptr); // Updates ptr
-        
+
         if (connection->isOpen() && end - ptr > 0) {
             // We're multi-casting, we will give read credit on delivery.
             grc.credit = 0;
@@ -432,7 +432,7 @@ void Connection::sessionState(
         unknownCompleted,
         receivedIncomplete);
     QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
-    // The output tasks will be added later in the update process. 
+    // The output tasks will be added later in the update process.
     connection->getOutputTasks().removeAll();
 }
 
@@ -478,7 +478,7 @@ void Connection::retractOffer() {
 
 void Connection::closeUpdated() {
     self.second = 0;      // Mark this as completed update connection.
-    if (connection.get()) 
+    if (connection.get())
         connection->close(connection::CLOSE_CODE_NORMAL, "OK");
 }
 
@@ -529,7 +529,7 @@ void Connection::deliveryRecord(const st
             m = getUpdateMessage();
             m.queue = queue.get();
             m.position = position;
-            if (enqueued) queue->updateEnqueued(m); //inform queue of the message 
+            if (enqueued) queue->updateEnqueued(m); //inform queue of the message
         } else {                // Message at original position in original queue
             m = queue->find(position);
         }
@@ -591,7 +591,7 @@ void Connection::txEnqueue(const std::st
 
 void Connection::txPublish(const framing::Array& queues, bool delivered) {
     boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
-    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) 
+    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;
     txBuffer->enlist(txPub);
@@ -678,6 +678,12 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
 }
 
+void Connection::doCatchupIoCallbacks() {
+    // We need to process IO callbacks during the catch-up phase in
+    // order to service asynchronous completions for messages
+    // transferred during catch-up.
 
+    if (catchUp) getBrokerConnection()->doIoCallbacks();
+}
 }} // Namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1074332&r1=1074331&r2=1074332&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Feb 24 22:35:53 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ class Connection :
         public sys::ConnectionInputHandler,
         public framing::AMQP_AllOperations::ClusterConnectionHandler,
         private broker::Connection::ErrorListener
-        
+
 {
   public:
 
@@ -73,7 +73,7 @@ class Connection :
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string&
mgmtId, const ConnectionId& id,
                const qpid::sys::SecuritySettings& external);
     ~Connection();
-    
+
     ConnectionId getId() const { return self; }
     broker::Connection* getBrokerConnection() { return connection.get(); }
     const broker::Connection* getBrokerConnection() const { return connection.get(); }
@@ -108,9 +108,9 @@ class Connection :
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const
qpid::framing::SequenceNumber& position);
-    
+
     // ==== Used in catch-up mode to build initial state.
-    // 
+    //
     // State update methods.
     void shadowPrepare(const std::string&);
 
@@ -123,9 +123,9 @@ class Connection :
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted,
                       const SequenceSet& receivedIncomplete);
-    
+
     void outputTask(uint16_t channel, const std::string& name);
-    
+
     void shadowReady(uint64_t memberId,
                      uint64_t connectionId,
                      const std::string& managementId,
@@ -189,6 +189,8 @@ class Connection :
 
     void setSecureConnection ( broker::SecureConnection * sc );
 
+    void doCatchupIoCallbacks();
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -233,7 +235,7 @@ class Connection :
     // Error listener functions
     void connectionError(const std::string&);
     void sessionError(uint16_t channel, const std::string&);
-    
+
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverDoOutput(uint32_t limit);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1074332&r1=1074331&r2=1074332&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Feb 24 22:35:53 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -66,11 +66,14 @@ void OutputInterceptor::giveReadCredit(i
 }
 
 // Called in write thread when the IO layer has no more data to write.
-// We do nothing in the write thread, we run doOutput only on delivery
-// of doOutput requests.
-bool OutputInterceptor::doOutput() { return false; }
+// We only process IO callbacks in the write thread during catch-up.
+// Normally we run doOutput only on delivery of doOutput requests.
+bool OutputInterceptor::doOutput() {
+    parent.doCatchupIoCallbacks();
+    return false;
+}
 
-// Send output up to limit, calculate new limit. 
+// Send output up to limit, calculate new limit.
 void OutputInterceptor::deliverDoOutput(uint32_t limit) {
     sentDoOutput = false;
     sendMax = limit;
@@ -78,7 +81,7 @@ void OutputInterceptor::deliverDoOutput(
     if (parent.isLocal()) {
         size_t buffered = getBuffered();
         if (buffered == 0 && sent == sendMax) // Could have sent more, increase the
limit.
-            newLimit = sendMax*2; 
+            newLimit = sendMax*2;
         else if (buffered > 0 && sent > 1) // Data left unsent, reduce the
limit.
             newLimit = (sendMax + sent) / 2;
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message