qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r722101 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h
Date Mon, 01 Dec 2008 15:40:15 GMT
Author: aconway
Date: Mon Dec  1 07:40:14 2008
New Revision: 722101

URL: http://svn.apache.org/viewvc?rev=722101&view=rev
Log:
cluster: Queue outgoing multicast events.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=722101&r1=722100&r2=722101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Dec  1 07:40:14 2008
@@ -99,6 +99,7 @@
         boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
     deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+    mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller),
     mcastId(0),
     mgmtObject(0),
     state(INIT),
@@ -116,6 +117,7 @@
     failoverExchange.reset(new FailoverExchange(this));
     cpgDispatchHandle.startWatch(poller);
     deliverQueue.start();
+    mcastQueue.start();
     QPID_LOG(notice, *this << " joining cluster " << name.str() << " with
url=" << myUrl);
     if (useQuorum) quorum.init();
     cpg.join(name);
@@ -176,10 +178,10 @@
     if (state <= CATCHUP && e.isConnection()) {
         // Stall outgoing connection events untill we are fully READY
         QPID_LOG(trace, *this << " MCAST deferred: " << e );
-        mcastQueue.push_back(e); 
+        mcastStallQueue.push_back(e); 
     }
     else 
-        e.mcast(name, cpg);
+        mcastQueue.push(e);
 }
 
 std::vector<Url> Cluster::getUrls() const {
@@ -432,8 +434,8 @@
         state = READY;
         QPID_LOG(notice, *this << " caught up, active cluster member");
         if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
-        for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this,
_1, boost::ref(l)));
-        mcastQueue.clear();
+        for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast,
this, _1, boost::ref(l)));
+        mcastStallQueue.clear();
     }
 }
 
@@ -545,7 +547,7 @@
 }
 
 void Cluster::memberUpdate(Lock& l) {
-    QPID_LOG(info, *this << " member update, map=" << map);
+    QPID_LOG(info, *this << map.memberCount() << " members: " << map);
     std::vector<Url> urls = getUrls(l);
     size_t size = urls.size();
     failoverExchange->setUrls(urls);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=722101&r1=722100&r2=722101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Dec  1 07:40:14 2008
@@ -190,8 +190,8 @@
     ConnectionMap connections;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-    PollableEventQueue deliverQueue;
-    PlainEventQueue mcastQueue;
+    PollableEventQueue deliverQueue, mcastQueue;
+    PlainEventQueue mcastStallQueue;
     uint32_t mcastId;
     framing::Uuid clusterId;
 



Mime
View raw message