qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r966933 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h
Date Fri, 23 Jul 2010 01:49:00 GMT
Author: aconway
Date: Fri Jul 23 01:48:59 2010
New Revision: 966933

URL: http://svn.apache.org/viewvc?rev=966933&view=rev
Log:
Race condition in cluster+management, inconsistent errors like:

"confirmed < (2097+0) but only sent < (2096+0)"

Management messages are generated if a managed objects properties have
changed since the last update. Properties of the cluster object
(members and status) were sometimes being changed outside the delivery
context which could create inconsistencies in the cluster.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=966933&r1=966932&r2=966933&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 23 01:48:59 2010
@@ -316,7 +316,6 @@ void Cluster::initialize() {
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
     broker.setExpiryPolicy(expiryPolicy);
-    dispatcher.start();
     deliverEventQueue.bypassOff();
     deliverEventQueue.start();
     deliverFrameQueue.bypassOff();
@@ -329,7 +328,6 @@ void Cluster::initialize() {
         _qmf::Package  packageInit(mAgent);
         mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
         mAgent->addObject (mgmtObject);
-        mgmtObject->set_status("JOINING");
     }
 
     // Run initMapCompleted immediately to process the initial configuration
@@ -340,6 +338,8 @@ void Cluster::initialize() {
     // Add finalizer last for exception safety.
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
 
+    // Start dispatching CPG events.
+    dispatcher.start();
 }
 
 // Called in connection thread to insert a client connection.
@@ -595,15 +595,26 @@ void Cluster::configChange ( 
 
 void Cluster::setReady(Lock&) {
     state = READY;
-    if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
     mcast.setReady();
     broker.getQueueEvents().enable();
     enableClusterSafe();    // Enable cluster-safe assertions.
 }
 
+// Set the management status from the Cluster::state.
+// 
+// NOTE: Management updates are sent based on property changes.  In
+// order to keep consistency across the cluster, we touch the local
+// management status property even if it is locally unchanged for any
+// event that could have cause a cluster property change on any cluster member.
+void Cluster::setMgmtStatus(Lock&) {
+    if (mgmtObject)
+        mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING");
+}
+
 void Cluster::initMapCompleted(Lock& l) {
-    // Called on completion of the initial status map. 
+    // Called on completion of the initial status map.    
     QPID_LOG(debug, *this << " initial status map complete. ");
+    setMgmtStatus(l);
     if (state == PRE_INIT) {
         // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
         // We decide here whether we want to recover from our store.
@@ -649,6 +660,7 @@ void Cluster::initMapCompleted(Lock& l) 
             discarding = false;
             setReady(l);
             memberUpdate(l);
+            updateMgmtMembership(l);
             mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
             QPID_LOG(notice, *this << " joined cluster " << name);
         }
@@ -695,6 +707,8 @@ void Cluster::configChange(const MemberI
         memberUpdate(l);
         if (elders.empty()) becomeElder(l);
     }
+
+    updateMgmtMembership(l);     // Update on every config change for consistency
 }
 
 void Cluster::becomeElder(Lock&) {
@@ -788,6 +802,9 @@ void Cluster::ready(const MemberId& id, 
     } catch (const Url::Invalid& e) {
         QPID_LOG(error, "Invalid URL in cluster ready command: " << url);
     }
+     // Update management on every ready event to be consistent across cluster.
+    setMgmtStatus(l);
+    updateMgmtMembership(l);
 }
 
 void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
{
@@ -900,6 +917,8 @@ void Cluster::checkUpdateIn(Lock& l) {
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         memberUpdate(l);
+        // NB: don't updateMgmtMembership() here as we are not in the deliver
+        // thread. It will be updated on delivery of the "ready" we just mcast.
         broker.setClusterUpdatee(false);
         if (mAgent) mAgent->suppress(false); // Enable management output.
         discarding = false;     // ok to set, we're stalled for update.
@@ -981,11 +1000,9 @@ void Cluster::memberUpdate(Lock& l) {
     // Ignore config changes while we are joining.
     if (state < CATCHUP) return;
     QPID_LOG(info, *this << " member update: " << map);
-    std::vector<Url> urls = getUrls(l);
-    std::vector<string> ids = getIds(l);
     size_t aliveCount = map.aliveCount();
     assert(map.isAlive(self));
-    failoverExchange->updateUrls(urls);
+    failoverExchange->updateUrls(getUrls(l));
 
     // Mark store clean if I am the only broker, dirty otherwise.
     if (store.hasStore()) {
@@ -1017,22 +1034,6 @@ void Cluster::memberUpdate(Lock& l) {
     }
     lastAliveCount = aliveCount;
 
-    if (mgmtObject) {
-        mgmtObject->set_clusterSize(urls.size()); 
-        string urlstr;
-        for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++
) {
-            if (iter != urls.begin()) urlstr += ";";
-            urlstr += iter->str();
-        }
-        string idstr;
-        for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++
) {
-            if (iter != ids.begin()) idstr += ";";
-            idstr += (*iter);
-        }
-        mgmtObject->set_members(urlstr);
-        mgmtObject->set_memberIDs(idstr);
-    }
-
     // Close connections belonging to members that have left the cluster.
     ConnectionMap::iterator i = connections.begin();
     while (i != connections.end()) {
@@ -1045,6 +1046,26 @@ void Cluster::memberUpdate(Lock& l) {
     }
 }
 
+// See comment on Cluster::setMgmtStatus
+void Cluster::updateMgmtMembership(Lock& l) {
+    if (!mgmtObject) return;
+    std::vector<Url> urls = getUrls(l);
+    mgmtObject->set_clusterSize(urls.size()); 
+    string urlstr;
+    for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
+        if (i != urls.begin()) urlstr += ";";
+        urlstr += i->str();
+    }
+    std::vector<string> ids = getIds(l);
+    string idstr;
+    for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) {
+        if (i != ids.begin()) idstr += ";";
+        idstr += *i;
+    }
+    mgmtObject->set_members(urlstr);
+    mgmtObject->set_memberIDs(idstr);
+}
+
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
     static const char* STATE[] = {
         "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=966933&r1=966932&r2=966933&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul 23 01:48:59 2010
@@ -196,6 +196,8 @@ class Cluster : private Cpg::Handler, pu
     void requestUpdate(Lock& );
     void initMapCompleted(Lock&);
     void becomeElder(Lock&);
+    void setMgmtStatus(Lock&);
+    void updateMgmtMembership(Lock&);
 
     // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message