qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1437771 - in /qpid/trunk/qpid/cpp/src/qpid/ha: Backup.cpp Backup.h BrokerInfo.cpp BrokerInfo.h BrokerReplicator.cpp BrokerReplicator.h HaBroker.cpp HaBroker.h Membership.cpp Membership.h Primary.cpp Primary.h Role.h StandAlone.h
Date Wed, 23 Jan 2013 21:58:04 GMT
Author: aconway
Date: Wed Jan 23 21:58:03 2013
New Revision: 1437771

URL: http://svn.apache.org/viewvc?rev=1437771&view=rev
Log:
NO-JIRA: HA refactor, re-organise code for clarity and thread safety.

Introduce Role base class. Primary and Backup are now subclasses of Role.  Moved
backup/primary specific code from HaBroker to the Backup and Primary roles.

HaBroker always holds a single Role, via a thread-safe RoleHolder.  RoleHolder
ensures atomic transition between roles: the old role is deleted before the new
role is created.

Membership is now independently thread safe, breaking the potential deadlock
between HaBroker and the Roles.

Logging improvements and other minor cleanup.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/Role.h
      - copied, changed from r1437742, qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h
      - copied, changed from r1437742, qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Wed Jan 23 21:58:03 2013
@@ -20,9 +20,12 @@
  */
 #include "Backup.h"
 #include "BrokerReplicator.h"
+#include "ConnectionObserver.h"
 #include "HaBroker.h"
+#include "Primary.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
+#include "StatusCheck.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
@@ -44,28 +47,38 @@ using namespace framing;
 using namespace broker;
 using types::Variant;
 using std::string;
+using sys::Mutex;
 
 Backup::Backup(HaBroker& hb, const Settings& s) :
-    logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+    logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+    haBroker(hb), broker(hb.getBroker()), settings(s),
+    statusCheck(
+        new StatusCheck(
+            logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo()))
 {
-    // Empty brokerUrl means delay initialization until seBrokertUrl() is called.
-    if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+    // Set link properties to tag outgoing links.
+    framing::FieldTable linkProperties = broker.getLinkClientProperties();
+    linkProperties.setTable(
+        ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
+    broker.setLinkClientProperties(linkProperties);
 }
 
-void Backup::initialize(const Url& brokers) {
-    if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
-    QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
-    string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
-    types::Uuid uuid(true);
-    // Declare the link
-    std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
-        broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
-        brokers[0].host, brokers[0].port, protocol,
-        false,                  // durable
-        settings.mechanism, settings.username, settings.password,
-        false);               // no amq.failover - don't want to use client URL.
-    {
-        sys::Mutex::ScopedLock l(lock);
+void Backup::setBrokerUrl(const Url& brokers) {
+    if (brokers.empty()) return;
+    Mutex::ScopedLock l(lock);
+    if (stopped) return;
+    if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
+    if (!link) {                // Not yet initialized
+        QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+        string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
+        types::Uuid uuid(true);
+        std::pair<Link::shared_ptr, bool> result;
+        result = broker.getLinks().declare(
+            broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
+            brokers[0].host, brokers[0].port, protocol,
+            false,                  // durable
+            settings.mechanism, settings.username, settings.password,
+            false);               // no amq.failover - don't want to use client URL.
         link = result.first;
         replicator.reset(new BrokerReplicator(haBroker, link));
         replicator->initialize();
@@ -74,8 +87,9 @@ void Backup::initialize(const Url& broke
     link->setUrl(brokers);          // Outside the lock, once set link doesn't change.
 }
 
-Backup::~Backup() {
-    QPID_LOG(debug, logPrefix << "No longer a backup.");
+void Backup::stop(Mutex::ScopedLock&) {
+    if (stopped) return;
+    QPID_LOG(debug, logPrefix << "Leaving backup role.");
     if (link) link->close();
     if (replicator.get()) {
         broker.getExchanges().destroy(replicator->getName());
@@ -84,33 +98,45 @@ Backup::~Backup() {
     }
 }
 
-// Called via management.
-void Backup::setBrokerUrl(const Url& url) {
-    // Ignore empty URLs seen during start-up for some tests.
-    if (url.empty()) return;
-    bool linkSet = false;
+Role* Backup::recover(Mutex::ScopedLock&) {
+    BrokerInfo::Set backups;
     {
-        sys::Mutex::ScopedLock l(lock);
-        linkSet = link;
+        Mutex::ScopedLock l(lock);
+        if (stopped) return 0;
+        stop(l);                 // Stop backup activity before starting primary.
+        QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo());
+        // Reset membership before allowing backups to connect.
+        backups = membership.otherBackups();
+        membership.clear();
+        return new Primary(haBroker, backups);
     }
-    if (linkSet)
-        link->setUrl(url);      // Outside lock, once set link doesn't change
-    else
-        initialize(url);        // Deferred initialization
 }
 
-void Backup::setStatus(BrokerStatus status) {
-    switch (status) {
-      case READY:
-        QPID_LOG(notice, logPrefix << "Ready to become primary.");
+Role* Backup::promote() {
+    Mutex::ScopedLock l(lock);
+    if (stopped) return 0;
+    switch (haBroker.getStatus()) {
+      case JOINING:
+        if (statusCheck->canPromote()) return recover(l);
+        else {
+            QPID_LOG(error,
+                     logPrefix << "Joining active cluster, cannot be promoted.");
+            throw Exception("Joining active cluster, cannot be promoted.");
+        }
         break;
       case CATCHUP:
-        QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted.");
+        QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+        throw Exception("Still catching up, cannot be promoted.");
         break;
+      case READY: return recover(l); break;
       default:
-        // FIXME aconway 2012-12-07: fail
-        assert(0);
+        assert(0);              // Not a valid state for the Backup role..
     }
 }
 
+Backup::~Backup() {
+    Mutex::ScopedLock l(lock);
+    stop(l);
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h Wed Jan 23 21:58:03 2013
@@ -22,6 +22,7 @@
  *
  */
 
+#include "Role.h"
 #include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
@@ -38,30 +39,41 @@ namespace ha {
 class Settings;
 class BrokerReplicator;
 class HaBroker;
+class StatusCheck;
+class Membership;
 
 /**
- * State associated with a backup broker. Manages connections to primary.
+ * Backup role: Manages connections to primary, replicates  management events and queue contents.
  *
  * THREAD SAFE
  */
-class Backup
+class Backup : public Role
 {
   public:
     Backup(HaBroker&, const Settings&);
     ~Backup();
+
+    std::string getLogPrefix() const { return logPrefix; }
+
     void setBrokerUrl(const Url&);
-    void setStatus(BrokerStatus);
+
+    Role* promote();
 
   private:
-    void initialize(const Url&);
+    void stop(sys::Mutex::ScopedLock&);
+    Role* recover(sys::Mutex::ScopedLock&);
+
     std::string logPrefix;
+    Membership& membership;
 
     sys::Mutex lock;
+    bool stopped;
     HaBroker& haBroker;
     broker::Broker& broker;
     Settings settings;
     boost::shared_ptr<broker::Link> link;
     boost::shared_ptr<BrokerReplicator> replicator;
+    std::auto_ptr<StatusCheck> statusCheck;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Wed Jan 23 21:58:03 2013
@@ -45,8 +45,9 @@ using framing::FieldTable;
 
 BrokerInfo::BrokerInfo() : port(0), status(JOINING) {}
 
-BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
-    hostName(host), port(port_), systemId(id), status(JOINING)
+BrokerInfo::BrokerInfo(const types::Uuid& id, BrokerStatus s,
+                       const std::string& host, uint16_t port_) :
+    hostName(host), port(port_), systemId(id), status(s)
 {
     updateLogId();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Wed Jan 23 21:58:03 2013
@@ -44,7 +44,8 @@ class BrokerInfo
     typedef std::map<types::Uuid, BrokerInfo> Map;
 
     BrokerInfo();
-    BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
+    BrokerInfo(const types::Uuid& id, BrokerStatus,
+               const std::string& host=std::string(), uint16_t port=0);
     BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
     BrokerInfo(const types::Variant::Map& m) { assign(m); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Jan 23 21:58:03 2013
@@ -227,7 +227,9 @@ class BrokerReplicator::UpdateTracker {
     typedef std::set<std::string> Names;
     typedef boost::function<void (const std::string&)> CleanFn;
 
-    UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), repTest(rt) {}
+    UpdateTracker(const std::string& type_, // "queue" or "exchange"
+                  CleanFn f, const ReplicationTest& rt)
+        : type(type_), cleanFn(f), repTest(rt) {}
 
     /** Destructor cleans up remaining initial queues. */
     ~UpdateTracker() {
@@ -264,6 +266,12 @@ class BrokerReplicator::UpdateTracker {
     }
 
   private:
+    void clean(const std::string& name) {
+        QPID_LOG(info, "Backup updated, deleting  " << type << " " << name);
+        cleanFn(name);
+    }
+
+    std::string type;
     Names initial, events;
     CleanFn cleanFn;
     ReplicationTest repTest;
@@ -353,13 +361,15 @@ void BrokerReplicator::initializeBridge(
     initialized = true;
 
     exchangeTracker.reset(
-        new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+        new UpdateTracker("exchange",
+                          boost::bind(&BrokerReplicator::deleteExchange, this, _1),
                           replicationTest));
     exchanges.eachExchange(
         boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
 
     queueTracker.reset(
-        new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+        new UpdateTracker("queue",
+                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
                           replicationTest));
     queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
 
@@ -394,7 +404,7 @@ void BrokerReplicator::route(Deliverable
     // We transition from JOINING->CATCHUP on the first message received from the primary.
     // Until now we couldn't be sure if we had a good connection to the primary.
     if (haBroker.getStatus() == JOINING) {
-        haBroker.setStatus(CATCHUP);
+        haBroker.getMembership().setStatus(CATCHUP);
         QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
     }
     Variant::List list;
@@ -572,7 +582,7 @@ void BrokerReplicator::doEventUnbind(Var
 
 void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
     Variant::List members = values[MEMBERS].asList();
-    haBroker.setMembership(members);
+    setMembership(members);
 }
 
 void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
@@ -724,7 +734,7 @@ void BrokerReplicator::doResponseHaBroke
         if (mine != primary)
             throw Exception(QPID_MSG("Replicate default on backup (" << mine
                                      << ") does not match primary (" <<  primary << ")"));
-        haBroker.setMembership(values[MEMBERS].asList());
+        setMembership(values[MEMBERS].asList());
     } catch (const std::exception& e) {
         haBroker.shutdown(
             QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
@@ -861,4 +871,25 @@ void BrokerReplicator::disconnected() {
              boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
 }
 
+void BrokerReplicator::setMembership(const Variant::List& brokers) {
+    Membership& membership(haBroker.getMembership());
+    membership.assign(brokers);
+    // Check if the primary has signalled a change in my status:
+    // from CATCHUP to READY when we are caught up.
+    // from READY TO CATCHUP if we are timed out during fail-over.
+    BrokerInfo info;
+    if (membership.get(membership.getSelf(), info)) {
+        BrokerStatus oldStatus = haBroker.getStatus();
+        BrokerStatus newStatus = info.getStatus();
+        if (oldStatus == CATCHUP && newStatus == READY) {
+            QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
+            haBroker.getMembership().setStatus(READY);
+        }
+        else if (oldStatus == READY && newStatus == CATCHUP) {
+            QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
+            haBroker.getMembership().setStatus(CATCHUP);
+        }
+    }
+}
+
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Wed Jan 23 21:58:03 2013
@@ -136,6 +136,8 @@ class BrokerReplicator : public broker::
     void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
     void disconnected();
 
+    void setMembership(const types::Variant::List&); // Set membership from list.
+
     std::string logPrefix;
     std::string userId, remoteHost;
     ReplicationTest replicationTest;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed Jan 23 21:58:03 2013
@@ -26,7 +26,7 @@
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
-#include "StatusCheck.h"
+#include "StandAlone.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
@@ -42,7 +42,6 @@
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
 
@@ -56,24 +55,23 @@ using types::Variant;
 using types::Uuid;
 using sys::Mutex;
 using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
 
 // Called in Plugin::earlyInitialize
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
-    : broker(b),
-      systemId(broker.getSystem()->getSystemId().data()),
+    : systemId(b.getSystem()->getSystemId().data()),
       settings(s),
+      replicationTest(s.replicateDefault.get()),
+      broker(b),
       observer(new ConnectionObserver(*this, systemId)),
-      status(STANDALONE),
-      logPrefix("Broker: "),
-      membership(systemId),
-      replicationTest(s.replicateDefault.get())
+      role(new StandAlone),
+      membership(BrokerInfo(systemId, STANDALONE), *this)
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
-        status = JOINING;
-        QPID_LOG(debug, logPrefix << "Rejecting client connections.");
+        QPID_LOG(debug, role->getLogPrefix() << "Rejecting client connections.");
         shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
         observer->setObserver(excluder, "Backup: ");
         broker.getConnectionObservers().add(observer);
@@ -87,13 +85,16 @@ bool isNone(const std::string& x) { retu
 
 // Called in Plugin::initialize
 void HaBroker::initialize() {
-
     // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
-    brokerInfo = BrokerInfo(
-        broker.getSystem()->getNodeName(),
-        broker.getPort(broker::Broker::TCP_TRANSPORT),
-        systemId);
-    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+    membership.add(
+        BrokerInfo(
+            membership.getSelf(),
+            settings.cluster ? JOINING : membership.getStatus(),
+            broker.getSystem()->getNodeName(),
+            broker.getPort(broker::Broker::TCP_TRANSPORT)
+        )
+    );
+    QPID_LOG(notice, role->getLogPrefix() << "Initializing: " << membership.getInfo());
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
@@ -104,90 +105,34 @@ void HaBroker::initialize() {
     mgmtObject->set_replicateDefault(settings.replicateDefault.str());
     mgmtObject->set_systemId(systemId);
     ma->addObject(mgmtObject);
+    membership.setMgmtObject(mgmtObject);
 
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
-        boost::shared_ptr<ReplicatingSubscription::Factory>(
+        shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
 
     // If we are in a cluster, start as backup in joining state.
     if (settings.cluster) {
-        status = JOINING;
-        backup.reset(new Backup(*this, settings));
+        assert(membership.getStatus() == JOINING);
+        role.reset(new Backup(*this, settings));
         broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
-        statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
         if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
         if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
     }
-
-
-    // NOTE: lock is not needed in a constructor, but create one
-    // to pass to functions that have a ScopedLock parameter.
-    Mutex::ScopedLock l(lock);
-    statusChanged(l);
 }
 
 HaBroker::~HaBroker() {
-    QPID_LOG(notice, logPrefix << "Shut down");
+    QPID_LOG(notice, role->getLogPrefix() << "Shut down");
     broker.getConnectionObservers().remove(observer);
 }
 
-// Called from ManagementMethod on promote.
-void HaBroker::recover() {
-    boost::shared_ptr<Backup> b;
-    BrokerInfo::Set backups;
-   {
-        Mutex::ScopedLock l(lock);
-        if (isPrimary(status)) {
-            QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo);
-            return;
-        }
-        QPID_LOG(notice, "Promoting to primary: " << brokerInfo);
-        // Reset membership before allowing backups to connect.
-        backups = membership.otherBackups();
-        membership.reset(brokerInfo);
-        // No longer replicating, close link. Note: link must be closed before we
-        // setStatus(RECOVERING) as that will remove our broker info from the
-        // outgoing link properties so we won't recognize self-connects.
-        b = backup;
-        backup.reset();         // Reset in lock.
-    }
-    b.reset();                  // Call destructor outside of lock.
-     {
-        Mutex::ScopedLock l(lock);
-        setStatus(RECOVERING, l);
-        // Drop the lock, new Primary may call back on activate.
-    }
-    // Outside of lock, may call back on activate()
-    primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
     switch (methodId) {
       case _qmf::HaBroker::METHOD_PROMOTE: {
-          switch (getStatus()) {
-            case JOINING:
-              if (statusCheck->canPromote())
-                  recover();
-              else {
-                  QPID_LOG(error,
-                           logPrefix << "Joining active cluster, cannot be promoted.");
-                  throw Exception("Cluster already active, cannot be promoted.");
-              }
-              break;
-             case CATCHUP:
-              QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
-              throw Exception("Still catching up, cannot be promoted.");
-              break;
-            case READY: recover(); break;
-            case RECOVERING: break;
-            case ACTIVE: break;
-            case STANDALONE: break;
-          }
-          break;
+        Role* r = role->promote();
+        if (r) role.reset(r);
+        break;
       }
       case _qmf::HaBroker::METHOD_SETBROKERSURL: {
           setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
@@ -200,10 +145,10 @@ Manageable::status_t HaBroker::Managemen
       case _qmf::HaBroker::METHOD_REPLICATE: {
           _qmf::ArgsHaBrokerReplicate& bq_args =
               dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
-          QPID_LOG(debug, logPrefix << "Replicate individual queue "
+          QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
                    << bq_args.i_queue << " from " << bq_args.i_broker);
 
-          boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+          shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
           Url url(bq_args.i_broker);
           string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
           Uuid uuid(true);
@@ -213,10 +158,10 @@ Manageable::status_t HaBroker::Managemen
               false,              // durable
               settings.mechanism, settings.username, settings.password,
               false);           // no amq.failover - don't want to use client URL.
-          boost::shared_ptr<broker::Link> link = result.first;
+          shared_ptr<broker::Link> link = result.first;
           link->setUrl(url);
           // Create a queue replicator
-          boost::shared_ptr<QueueReplicator> qr(
+          shared_ptr<QueueReplicator> qr(
               new QueueReplicator(*this, queue, link));
           qr->activate();
           broker.getExchanges().registerExchange(qr);
@@ -235,20 +180,17 @@ void HaBroker::setPublicUrl(const Url& u
     mgmtObject->set_publicUrl(url.str());
     knownBrokers.clear();
     knownBrokers.push_back(url);
-    QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
+    QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
 }
 
 void HaBroker::setBrokerUrl(const Url& url) {
-    boost::shared_ptr<Backup> b;
     {
         Mutex::ScopedLock l(lock);
         brokerUrl = url;
         mgmtObject->set_brokersUrl(brokerUrl.str());
-        QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
-        if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
-        b = backup;
+        QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
     }
-    if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
+    role->setBrokerUrl(url); // Oustside lock
 }
 
 std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -263,110 +205,7 @@ void HaBroker::shutdown(const std::strin
 }
 
 BrokerStatus HaBroker::getStatus() const {
-    return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
-    Mutex::ScopedLock l(lock);
-    setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
-    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
-    static const BrokerStatus TRANSITIONS[][2] = {
-        { JOINING, CATCHUP },    // Connected to primary
-        { JOINING, RECOVERING }, // Chosen as initial primary.
-        { CATCHUP, READY },      // Caught up all queues, ready to take over.
-        { READY, RECOVERING },   // Chosen as new primary
-        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
-        { RECOVERING, ACTIVE }   // All expected backups are ready
-    };
-    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
-    for (size_t i = 0; i < N; ++i) {
-        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
-            return true;
-    }
-    return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
-    QPID_LOG(info, logPrefix << "Status change: "
-             << printable(status) << " -> " << printable(newStatus));
-    bool legal = checkTransition(status, newStatus);
-    assert(legal);
-    if (!legal) {
-        shutdown(QPID_MSG(logPrefix << "Illegal state transition: "
-                 << printable(status) << " -> " << printable(newStatus)));
-    }
-    status = newStatus;
-    statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
-    mgmtObject->set_status(printable(status).str());
-    brokerInfo.setStatus(status);
-    membership.add(brokerInfo);
-    membershipUpdated(l);
-    setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
-    QPID_LOG(info, logPrefix << "Membership: " <<  membership);
-    Variant::List brokers = membership.asList();
-    mgmtObject->set_members(brokers);
-    broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
-    boost::shared_ptr<Backup> b;
-    {
-        Mutex::ScopedLock l(lock);
-        membership.assign(brokers);
-        BrokerInfo info;
-        // Update my status to what the primary says it is.  The primary sets
-        // status to READY when we are caught up, and sets status to CATCHUP
-        // (from READY) if we are timed out during recovery.
-        if (membership.get(systemId, info) && status != info.getStatus()) {
-            assert((status == CATCHUP && info.getStatus() == READY) ||
-                   (status == READY && info.getStatus() == CATCHUP));
-            setStatus(info.getStatus(), l);
-            b = backup;
-        }
-        membershipUpdated(l);
-    }
-    if (b) b->setStatus(status); // Oustside lock, avoid deadlock
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
-    Mutex::ScopedLock l(lock);
-    membership.add(b);
-    membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
-    Mutex::ScopedLock l(lock);
-    BrokerInfo info;
-    if (membership.get(id, info)) {
-        membership.remove(id);
-        membershipUpdated(l);
-    }
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
-    framing::FieldTable linkProperties = broker.getLinkClientProperties();
-    if (isBackup(status)) {
-        // If this is a backup then any outgoing links are backup
-        // links and need to be tagged.
-        linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
-    }
-    else {
-        // If this is a primary then any outgoing links are federation links
-        // and should not be tagged.
-        linkProperties.erase(ConnectionObserver::BACKUP_TAG);
-    }
-    broker.setLinkClientProperties(linkProperties);
+    return membership.getStatus();
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Wed Jan 23 21:58:03 2013
@@ -53,12 +53,15 @@ namespace ha {
 class Backup;
 class ConnectionObserver;
 class Primary;
-class StatusCheck;
-
+class Role;
 /**
  * HA state and actions associated with a HA broker. Holds all the management info.
  *
  * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+
+ * NOTE: HaBroker and Role subclasses follow this lock hierarchy:
+ * - HaBroker MUST NOT hold its own lock across calls Role subclasses.
+ * - Role subclasses MAY hold their locks accross calls to HaBroker.
  */
 class HaBroker : public management::Manageable
 {
@@ -82,53 +85,37 @@ class HaBroker : public management::Mana
     void shutdown(const std::string& message);
 
     BrokerStatus getStatus() const;
-    void setStatus(BrokerStatus);
-    void activate();
-
-    Backup* getBackup() { return backup.get(); }
     ReplicationTest getReplicationTest() const { return replicationTest; }
-
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
-    const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-
-    void setMembership(const types::Variant::List&); // Set membership from list.
-    void addBroker(const BrokerInfo& b);       // Add a broker to the membership.
-    void removeBroker(const types::Uuid& id);  // Remove a broker from membership.
-
+    BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+    Membership& getMembership() { return membership; }
     types::Uuid getSystemId() const { return systemId; }
 
   private:
+
     void setPublicUrl(const Url&);
     void setBrokerUrl(const Url&);
     void updateClientUrl(sys::Mutex::ScopedLock&);
 
-    void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
-    void recover();
-    void statusChanged(sys::Mutex::ScopedLock&);
-    void setLinkProperties(sys::Mutex::ScopedLock&);
-
     std::vector<Url> getKnownBrokers() const;
 
-    void membershipUpdated(sys::Mutex::ScopedLock&);
-
-    broker::Broker& broker;
-    types::Uuid systemId;
+    // Immutable members
+    const types::Uuid systemId;
     const Settings settings;
 
+    // Member variables protected by lock
     mutable sys::Mutex lock;
-    boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
-    boost::shared_ptr<Backup> backup;
-    boost::shared_ptr<Primary> primary;
-    qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
     Url publicUrl, brokerUrl;
     std::vector<Url> knownBrokers;
-    BrokerStatus status;
-    std::string logPrefix;
-    BrokerInfo brokerInfo;
-    Membership membership;
     ReplicationTest replicationTest;
-    std::auto_ptr<StatusCheck> statusCheck;
+
+    // Independently thread-safe member variables
+    broker::Broker& broker;
+    qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
+    boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
+    boost::shared_ptr<Role> role;
+    Membership membership;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Wed Jan 23 21:58:03 2013
@@ -19,6 +19,12 @@
  *
  */
 #include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include <boost/bind.hpp>
 #include <iostream>
 #include <iterator>
@@ -26,37 +32,57 @@
 namespace qpid {
 namespace ha {
 
+namespace _qmf = ::qmf::org::apache::qpid::ha;
 
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+    : haBroker(b), self(info.getSystemId())
+{
+    brokers[self] = info;
+}
+
+void Membership::clear() {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo me = brokers[self];
     brokers.clear();
-    brokers[b.getSystemId()] = b;
+    brokers[self] = me;
 }
 
 void Membership::add(const BrokerInfo& b) {
+    Mutex::ScopedLock l(lock);
     brokers[b.getSystemId()] = b;
+    update(l);
 }
 
 
 void Membership::remove(const types::Uuid& id) {
+    Mutex::ScopedLock l(lock);
     BrokerInfo::Map::iterator i = brokers.find(id);
     if (i != brokers.end()) {
         brokers.erase(i);
-        }
+        update(l);
+    }
 }
 
 bool Membership::contains(const types::Uuid& id) {
+    Mutex::ScopedLock l(lock);
     return brokers.find(id) != brokers.end();
 }
 
 void Membership::assign(const types::Variant::List& list) {
+    Mutex::ScopedLock l(lock);
     brokers.clear();
     for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
         BrokerInfo b(i->asMap());
         brokers[b.getSystemId()] = b;
     }
+    update(l);
 }
 
 types::Variant::List Membership::asList() const {
+    Mutex::ScopedLock l(lock);
     types::Variant::List list;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         list.push_back(i->second.asMap());
@@ -64,6 +90,7 @@ types::Variant::List Membership::asList(
 }
 
 BrokerInfo::Set Membership::otherBackups() const {
+    Mutex::ScopedLock l(lock);
     BrokerInfo::Set result;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups
     return result;
 }
 
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
-    BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Map::const_iterator i = brokers.find(id);
     if (i == brokers.end()) return false;
     result = i->second;
     return true;
 }
 
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
-    return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+    QPID_LOG(info, "Membership: " <<  brokers);
+    Variant::List brokers = asList();
+    if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+    if (mgmtObject) mgmtObject->set_members(brokers);
+    haBroker.getBroker().getManagementAgent()->raiseEvent(
+        _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+    Mutex::ScopedLock l(lock);
+    mgmtObject = mo;
+    update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+    static const BrokerStatus TRANSITIONS[][2] = {
+        { STANDALONE, JOINING }, // Initialization of backup broker
+        { JOINING, CATCHUP },    // Connected to primary
+        { JOINING, RECOVERING }, // Chosen as initial primary.
+        { CATCHUP, READY },      // Caught up all queues, ready to take over.
+        { READY, RECOVERING },   // Chosen as new primary
+        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
+        { RECOVERING, ACTIVE }   // All expected backups are ready
+    };
+    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+    for (size_t i = 0; i < N; ++i) {
+        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+            return true;
+    }
+    return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+    BrokerStatus status = getStatus();
+    QPID_LOG(info, "Status change: "
+             << printable(status) << " -> " << printable(newStatus));
+    bool legal = checkTransition(status, newStatus);
+    if (!legal) {
+        haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+                                 << " -> " << printable(newStatus)));
+    }
+
+    Mutex::ScopedLock l(lock);
+    brokers[self].setStatus(newStatus);
+    if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+    update(l);
+}
+
+BrokerStatus Membership::getStatus() const  {
+    Mutex::ScopedLock l(lock);
+    return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const  {
+    BrokerInfo::Map::const_iterator i = brokers.find(self);
+    assert(i != brokers.end());
+    return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const  {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Map::const_iterator i = brokers.find(self);
+    assert(i != brokers.end());
+    return i->second;
 }
 
+// FIXME aconway 2013-01-23: move to .h?
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Wed Jan 23 21:58:03 2013
@@ -24,45 +24,72 @@
 
 #include "BrokerInfo.h"
 #include "types.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/types/Variant.h"
 #include <boost/function.hpp>
 #include <set>
 #include <vector>
 #include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
 namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
 namespace ha {
+class HaBroker;
 
 /**
  * Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
  */
 class Membership
 {
   public:
-    Membership(const types::Uuid& self_) : self(self_) {}
+    Membership(const BrokerInfo& info, HaBroker&);
+
+    void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
 
-    void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+    void clear();               ///< Clear all but self.
     void add(const BrokerInfo& b);
     void remove(const types::Uuid& id);
     bool contains(const types::Uuid& id);
+
     /** Return IDs of all READY backups other than self */
     BrokerInfo::Set otherBackups() const;
 
     void assign(const types::Variant::List&);
     types::Variant::List asList() const;
 
-    bool get(const types::Uuid& id, BrokerInfo& result);
+    bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+    types::Uuid getSelf() const  { return self; }
+    BrokerInfo getInfo() const;
+    BrokerStatus getStatus() const;
+    void setStatus(BrokerStatus s);
 
   private:
-    types::Uuid self;
+    void update(sys::Mutex::ScopedLock&);
+    BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+    mutable sys::Mutex lock;
+    HaBroker& haBroker;
+    boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+    const types::Uuid self;
     BrokerInfo::Map brokers;
-    friend std::ostream& operator<<(std::ostream&, const Membership&);
 };
 
-std::ostream& operator<<(std::ostream&, const Membership&);
-
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_MEMBERSHIP_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Wed Jan 23 21:58:03 2013
@@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public s
 Primary* Primary::instance = 0;
 
 Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
-    haBroker(hb), logPrefix("Primary: "), active(false)
+    haBroker(hb), membership(hb.getMembership()),
+    logPrefix("Primary: "), active(false)
 {
+    hb.getMembership().setStatus(RECOVERING);
     assert(instance == 0);
     instance = this;            // Let queue replicators find us.
     if (expect.empty()) {
@@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const Bro
         hb.getBroker().getTimer().add(timerTask);
     }
 
+
+    // Remove backup tag property from outgoing link properties.
+    framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+    linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+    hb.getBroker().setLinkClientProperties(linkProperties);
+
     configurationObserver.reset(new PrimaryConfigurationObserver(*this));
     haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
 
     Mutex::ScopedLock l(lock);  // We are now active as a configurationObserver
     checkReady(l);
+
     // Allow client connections
     connectionObserver.reset(new PrimaryConnectionObserver(*this));
     haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
@@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLo
         active = true;
         Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
         QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
-        haBroker.activate();
+        membership.setStatus(ACTIVE);
     }
 }
 
@@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iter
     if (i != backups.end() && i->second->reportReady()) {
         BrokerInfo info = i->second->getBrokerInfo();
         info.setStatus(READY);
-        haBroker.addBroker(info);
+        membership.add(info);
         if (expectedBackups.erase(i->second)) {
             QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
             checkReady(l);
@@ -164,7 +173,7 @@ void Primary::timeoutExpectedBackups() {
                 // Downgrade the broker's status to CATCHUP
                 // The broker will get this status change when it eventually connects.
                 info.setStatus(CATCHUP);
-                haBroker.addBroker(info);
+                membership.add(info);
             }
             else ++i;
         }
@@ -243,7 +252,7 @@ void Primary::opened(broker::Connection&
             checkReady(i, l);
         }
         if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
-        haBroker.addBroker(info);
+        membership.add(info);
     }
     else
         QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -260,7 +269,7 @@ void Primary::closed(broker::Connection&
         // Checking  isConnected() lets us ignore such spurious closes.
         if (i != backups.end() && i->second->isConnected()) {
             QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
-            haBroker.removeBroker(info.getSystemId());
+            membership.remove(info.getSystemId());
             expectedBackups.erase(i->second);
             backups.erase(i);
             checkReady(l);
@@ -276,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::g
     return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
 }
 
+Role* Primary::promote() {
+    QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+    return 0;
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1437771&r1=1437770&r2=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Wed Jan 23 21:58:03 2013
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "BrokerInfo.h"
+#include "Role.h"
 #include "qpid/sys/Mutex.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -48,6 +49,7 @@ class HaBroker;
 class ReplicatingSubscription;
 class RemoteBackup;
 class QueueGuard;
+class Membership;
 
 /**
  * State associated with a primary broker:
@@ -56,7 +58,7 @@ class QueueGuard;
  *
  * THREAD SAFE: called concurrently in arbitrary connection threads.
  */
-class Primary
+class Primary : public Role
 {
   public:
     typedef boost::shared_ptr<broker::Queue> QueuePtr;
@@ -67,6 +69,11 @@ class Primary
     Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
     ~Primary();
 
+    // Role implementation
+    std::string getLogPrefix() const { return logPrefix; }
+    Role* promote();
+    void setBrokerUrl(const Url&) {}
+
     void readyReplica(const ReplicatingSubscription&);
     void removeReplica(const std::string& q);
 
@@ -94,6 +101,7 @@ class Primary
 
     sys::Mutex lock;
     HaBroker& haBroker;
+    Membership& membership;
     std::string logPrefix;
     bool active;
     /**

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/Role.h (from r1437742, qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Role.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/Role.h&p1=qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h&r1=1437742&r2=1437771&rev=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Role.h Wed Jan 23 21:58:03 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_BACKUP_H
-#define QPID_HA_BACKUP_H
+#ifndef QPID_HA_ROLE_H
+#define QPID_HA_ROLE_H
 
 /*
  *
@@ -22,48 +22,34 @@
  *
  */
 
-#include "Settings.h"
-#include "qpid/Url.h"
-#include "qpid/sys/Mutex.h"
-#include <boost/shared_ptr.hpp>
+#include <string>
 
 namespace qpid {
-
-namespace broker {
-class Broker;
-class Link;
-}
+class Url;
 
 namespace ha {
-class Settings;
-class BrokerReplicator;
-class HaBroker;
 
 /**
- * State associated with a backup broker. Manages connections to primary.
- *
- * THREAD SAFE
+ * A HaBroker has a role, e.g. Primary, Backup, StandAlone.
+ * Role subclasses define the actions of the broker in each role.
+ * The Role interface allows the HaBroker to pass management actions
+ * to be implemented by the role.
  */
-class Backup
+class Role
 {
   public:
-    Backup(HaBroker&, const Settings&);
-    ~Backup();
-    void setBrokerUrl(const Url&);
-    void setStatus(BrokerStatus);
+    /** Log prefix appropriate to the role */
+    virtual std::string getLogPrefix() const = 0;
 
-  private:
-    void initialize(const Url&);
-    std::string logPrefix;
+    /** QMF promote method handler.
+     * @return The new role if promoted, 0 if not. Caller takes ownership.
+     */
+    virtual Role* promote() = 0;
 
-    sys::Mutex lock;
-    HaBroker& haBroker;
-    broker::Broker& broker;
-    Settings settings;
-    boost::shared_ptr<broker::Link> link;
-    boost::shared_ptr<BrokerReplicator> replicator;
-};
+    virtual void setBrokerUrl(const Url& url) = 0;
 
+  private:
+};
 }} // namespace qpid::ha
 
-#endif  /*!QPID_HA_BACKUP_H*/
+#endif  /*!QPID_HA_ROLE_H*/

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h (from r1437742, qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h&p1=qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h&r1=1437742&r2=1437771&rev=1437771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h Wed Jan 23 21:58:03 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_BACKUP_H
-#define QPID_HA_BACKUP_H
+#ifndef QPID_HA_STANDALONE_H
+#define QPID_HA_STANDALONE_H
 
 /*
  *
@@ -21,49 +21,25 @@
  * under the License.
  *
  */
-
-#include "Settings.h"
-#include "qpid/Url.h"
-#include "qpid/sys/Mutex.h"
-#include <boost/shared_ptr.hpp>
-
 namespace qpid {
-
-namespace broker {
-class Broker;
-class Link;
-}
+class Url;
 
 namespace ha {
-class Settings;
-class BrokerReplicator;
-class HaBroker;
 
 /**
- * State associated with a backup broker. Manages connections to primary.
- *
- * THREAD SAFE
+ * Stand-alone role: acts as a stand-alone broker, no clustering.
+ * HA module needed to setting up replication via QMF methods.
  */
-class Backup
+class StandAlone : public Role
 {
   public:
-    Backup(HaBroker&, const Settings&);
-    ~Backup();
-    void setBrokerUrl(const Url&);
-    void setStatus(BrokerStatus);
+    std::string getLogPrefix() const { return logPrefix; }
+    Role* promote() { return 0; }
+    void setBrokerUrl(const Url&) {}
 
   private:
-    void initialize(const Url&);
     std::string logPrefix;
-
-    sys::Mutex lock;
-    HaBroker& haBroker;
-    broker::Broker& broker;
-    Settings settings;
-    boost::shared_ptr<broker::Link> link;
-    boost::shared_ptr<BrokerReplicator> replicator;
 };
-
 }} // namespace qpid::ha
 
-#endif  /*!QPID_HA_BACKUP_H*/
+#endif  /*!QPID_HA_STANDALONE_H*/



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message