qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1495466 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/
Date Fri, 21 Jun 2013 15:02:09 GMT
Author: aconway
Date: Fri Jun 21 15:02:08 2013
New Revision: 1495466

URL: http://svn.apache.org/r1495466
Log:
QPID-4944: HA Sporadic failure in ha_tests: tes_failover_send_receive and test_expected_backup_timeout

Very sporadic failures so difficult to verify the fix.

- Simplified Membership, centralized status change, make it atomic.
- Fix test bug in test_expected_backup_timeout: not waiting on final status check, race.
- Remove out-of-date status info from log prefixes:  Guard, ReplicatingSubscription

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Fri Jun 21 15:02:08 2013
@@ -91,13 +91,16 @@ void BrokerInfo::assign(const Variant::M
     status = BrokerStatus(get(m, STATUS).asUint8());
 }
 
-std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
-    o  << b.getSystemId().str().substr(0,8);
-    if (b.getAddress() != empty) o << "@" << b.getAddress();
-    o << "(" << printable(b.getStatus()) << ")";
+std::ostream& BrokerInfo::printId(std::ostream& o) const {
+    o  << getSystemId().str().substr(0,8);
+    if (getAddress() != empty) o << "@" << getAddress();
     return o;
 }
 
+std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
+    return b.printId(o) << "(" << printable(b.getStatus()) << ")";
+}
+
 std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos)
{
     std::ostream_iterator<BrokerInfo> out(o, " ");
     copy(infos.begin(), infos.end(), out);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Fri Jun 21 15:02:08 2013
@@ -64,6 +64,9 @@ class BrokerInfo
     // So it can be put in a set.
     bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
 
+    // Print just the identifying information, not the status.
+    std::ostream& printId(std::ostream& o) const;
+
   private:
     Address address;
     types::Uuid systemId;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun 21 15:02:08 2013
@@ -902,24 +902,7 @@ void BrokerReplicator::disconnected() {
 }
 
 void BrokerReplicator::setMembership(const Variant::List& brokers) {
-    Membership& membership(haBroker.getMembership());
-    membership.assign(brokers);
-    // Check if the primary has signalled a change in my status:
-    // from CATCHUP to READY when we are caught up.
-    // from READY TO CATCHUP if we are timed out during fail-over.
-    BrokerInfo info;
-    if (membership.get(membership.getSelf(), info)) {
-        BrokerStatus oldStatus = haBroker.getStatus();
-        BrokerStatus newStatus = info.getStatus();
-        if (oldStatus == CATCHUP && newStatus == READY) {
-            QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
-            haBroker.getMembership().setStatus(READY);
-        }
-        else if (oldStatus == READY && newStatus == CATCHUP) {
-            QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching
up");
-            haBroker.getMembership().setStatus(CATCHUP);
-        }
-    }
+    haBroker.getMembership().assign(brokers);
 }
 
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jun 21 15:02:08 2013
@@ -43,6 +43,7 @@ Membership::Membership(const BrokerInfo&
     : haBroker(b), self(info.getSystemId())
 {
     brokers[self] = info;
+    oldStatus = info.getStatus();
 }
 
 void Membership::clear() {
@@ -54,6 +55,7 @@ void Membership::clear() {
 
 void Membership::add(const BrokerInfo& b) {
     Mutex::ScopedLock l(lock);
+    assert(b.getSystemId() != self);
     brokers[b.getSystemId()] = b;
     update(l);
 }
@@ -86,6 +88,10 @@ void Membership::assign(const types::Var
 
 types::Variant::List Membership::asList() const {
     Mutex::ScopedLock l(lock);
+    return asList(l);
+}
+
+types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
     types::Variant::List list;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         list.push_back(i->second.asMap());
@@ -109,18 +115,43 @@ bool Membership::get(const types::Uuid& 
     return true;
 }
 
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+    static const BrokerStatus TRANSITIONS[][2] = {
+        { STANDALONE, JOINING }, // Initialization of backup broker
+        { JOINING, CATCHUP },    // Connected to primary
+        { JOINING, RECOVERING }, // Chosen as initial primary.
+        { CATCHUP, READY },      // Caught up all queues, ready to take over.
+        { READY, RECOVERING },   // Chosen as new primary
+        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
+        { RECOVERING, ACTIVE }   // All expected backups are ready
+    };
+    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+    for (size_t i = 0; i < N; ++i) {
+        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+            return true;
+    }
+    return false;
+}
+} // namespace
+
+
 void Membership::update(Mutex::ScopedLock& l) {
     QPID_LOG(info, "Membership: " <<  brokers);
     // Update managment and send update event.
-    Variant::List brokerList = asList();
-    if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
-    if (mgmtObject) mgmtObject->set_members(brokerList);
+    BrokerStatus newStatus = getStatus(l);
+    Variant::List brokerList = asList(l);
+    if (mgmtObject) {
+        mgmtObject->set_status(printable(newStatus).str());
+        mgmtObject->set_members(brokerList);
+    }
     haBroker.getBroker().getManagementAgent()->raiseEvent(
         _qmf::EventMembersUpdate(brokerList));
 
     // Update link client properties
     framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties();
-    if (isBackup(getStatus(l))) {
+    if (isBackup(newStatus)) {
         // Set backup tag on outgoing link properties.
         linkProperties.setTable(
             ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable());
@@ -130,6 +161,17 @@ void Membership::update(Mutex::ScopedLoc
         linkProperties.erase(ConnectionObserver::BACKUP_TAG);
         haBroker.getBroker().setLinkClientProperties(linkProperties);
     }
+
+    // Check status transitions
+    if (oldStatus != newStatus) {
+        QPID_LOG(info, "Status change: "
+                 << printable(oldStatus) << " -> " << printable(newStatus));
+        if (!checkTransition(oldStatus, newStatus)) {
+            haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus)
+                                       << " -> " << printable(newStatus)));
+        }
+        oldStatus = newStatus;
+    }
 }
 
 void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
@@ -139,40 +181,9 @@ void Membership::setMgmtObject(boost::sh
 }
 
 
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
-    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
-    static const BrokerStatus TRANSITIONS[][2] = {
-        { STANDALONE, JOINING }, // Initialization of backup broker
-        { JOINING, CATCHUP },    // Connected to primary
-        { JOINING, RECOVERING }, // Chosen as initial primary.
-        { CATCHUP, READY },      // Caught up all queues, ready to take over.
-        { READY, RECOVERING },   // Chosen as new primary
-        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
-        { RECOVERING, ACTIVE }   // All expected backups are ready
-    };
-    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
-    for (size_t i = 0; i < N; ++i) {
-        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
-            return true;
-    }
-    return false;
-}
-} // namespace
-
 void Membership::setStatus(BrokerStatus newStatus) {
-    BrokerStatus status = getStatus();
-    QPID_LOG(info, "Status change: "
-             << printable(status) << " -> " << printable(newStatus));
-    bool legal = checkTransition(status, newStatus);
-    if (!legal) {
-        haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
-                                 << " -> " << printable(newStatus)));
-    }
-
     Mutex::ScopedLock l(lock);
     brokers[self].setStatus(newStatus);
-    if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
     update(l);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Fri Jun 21 15:02:08 2013
@@ -83,12 +83,14 @@ class Membership
   private:
     void update(sys::Mutex::ScopedLock&);
     BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+    types::Variant::List asList(sys::Mutex::ScopedLock&) const;
 
     mutable sys::Mutex lock;
     HaBroker& haBroker;
     boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
     const types::Uuid self;
     BrokerInfo::Map brokers;
+    BrokerStatus oldStatus;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Jun 21 15:02:08 2013
@@ -51,7 +51,8 @@ QueueGuard::QueueGuard(broker::Queue& q,
     : cancelled(false), queue(q)
 {
     std::ostringstream os;
-    os << "Guard of " << queue.getName() << " at " << info <<
": ";
+    os << "Guard of " << queue.getName() << " at ";
+    info.printId(os) << ": ";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
     queue.addObserver(observer);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Jun 21 15:02:08 2013
@@ -121,7 +121,8 @@ ReplicatingSubscription::ReplicatingSubs
 
         // Set a log prefix message that identifies the remote broker.
         ostringstream os;
-        os << "Subscription to " << queue->getName() << " at " <<
info << ": ";
+        os << "Subscription to " << queue->getName() << " at ";
+        info.printId(os) << ": ";
         logPrefix = os.str();
 
         // If this is a non-cluster standalone replication then we need to

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Fri Jun 21 15:02:08 2013
@@ -107,7 +107,7 @@ class HaBroker(Broker):
         ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+
+                 "--log-enable=debug+:ha::",
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker wont

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jun 21 15:02:08 2013
@@ -1121,13 +1121,10 @@ class RecoveryTests(HaBrokerTest):
         but can still rejoin.
         """
         cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
-        cluster[0].wait_status("active") # Primary ready
-        for b in cluster[1:3]: b.wait_status("ready") # Backups ready
         for i in [0,1]: cluster.kill(i, False)
-        cluster[2].promote()    # New primary, expected backup will 1
-        cluster[2].wait_status("recovering")
+        cluster[2].promote()    # New primary, expected backup will be 1
         # Should not go active till the expected backup connects or times out.
-        self.assertEqual(cluster[2].ha_status(), "recovering")
+        cluster[2].wait_status("recovering")
         # Messages should be held till expected backup times out
         s = cluster[2].connect().session().sender("q;{create:always}")
         s.send("foo", sync=False)
@@ -1135,7 +1132,7 @@ class RecoveryTests(HaBrokerTest):
         try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
         except Timeout: pass
         s.sync(timeout=1)       # And released after the timeout.
-        self.assertEqual(cluster[2].ha_status(), "active")
+        cluster[2].wait_status("active")
 
     def test_join_ready_cluster(self):
         """If we join a cluster where the primary is dead, the new primary is



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message