qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1352999 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Link.cpp qpid/ha/Backup.cpp qpid/ha/Backup.h qpid/ha/BrokerReplicator.cpp qpid/ha/BrokerReplicator.h qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/Primary.cpp tests/ha_tests.py
Date Fri, 22 Jun 2012 19:28:21 GMT
Author: aconway
Date: Fri Jun 22 19:28:20 2012
New Revision: 1352999

URL: http://svn.apache.org/viewvc?rev=1352999&view=rev
Log:
QPID-4078: Fix primary self-connections in long running test.

Assert to detect self-connection were triggered in log runs of ha_tests.py
test_failover_send_receive. Fix:

- HaBroker close backup link before removing broker-info for outgoing link.
- HaBroker removes own address from failover addresses.
- Link.cpp: Skip ioThreadProcessing and maintenanceVisit on a link that is closed.
- Minor improvements to log messages and comments.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Fri Jun 22 19:28:20 2012
@@ -445,7 +445,7 @@ void Link::ioThreadProcessing()
 {
     Mutex::ScopedLock mutex(lock);
 
-    if (state != STATE_OPERATIONAL)
+    if (state != STATE_OPERATIONAL || closing)
         return;
 
     // check for bridge session errors and recover
@@ -482,7 +482,7 @@ void Link::ioThreadProcessing()
 void Link::maintenanceVisit ()
 {
     Mutex::ScopedLock mutex(lock);
-
+    if (closing) return;
     if (state == STATE_WAITING)
     {
         visitCount++;
@@ -500,7 +500,7 @@ void Link::maintenanceVisit ()
     }
     else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() ||
!cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-    }
+}
 
 void Link::reconnectLH(const Address& a)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Jun 22 19:28:20 2012
@@ -56,25 +56,24 @@ bool Backup::isSelf(const Address& a) co
         a.port == haBroker.getBroker().getPort(a.protocol);
 }
 
-Url Backup::linkUrl(const Url& brokers) const {
-    return brokers;
-    /** FIXME aconway 2012-05-29: Problems with self-test, false positives.
-    // linkUrl contains only the addresses of *
-    other* brokers, not this one.
+// Remove my own address from the URL if possible.
+// This isn't 100% reliable given the many ways to specify a host,
+// but should work in most cases. We have additional measures to prevent
+// self-connection in ConnectionObserver
+Url Backup::removeSelf(const Url& brokers) const {
     Url url;
     for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         if (!isSelf(*i)) url.push_back(*i);
-    if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty");
-    QPID_LOG(debug, logPrefix << " failover URL (excluding self): " << url);
+    if (url.empty())
+        throw Url::Invalid(logPrefix+"Failover URL is empty");
+    QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
     return url;
-    */
 }
 
 void Backup::initialize(const Url& brokers) {
     if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
     QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers);
-    sys::Mutex::ScopedLock l(lock);
-    Url url = linkUrl(brokers);
+    Url url = removeSelf(brokers);
     string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
     types::Uuid uuid(true);
     // Declare the link
@@ -93,8 +92,6 @@ void Backup::initialize(const Url& broke
 
 Backup::~Backup() {
     if (link) link->close();
-    // FIXME aconway 2012-05-30: race: may have outstanding initializeBridge calls
-    // pointing to this.
     if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
     replicator.reset();
 }
@@ -103,12 +100,15 @@ Backup::~Backup() {
 void Backup::setBrokerUrl(const Url& url) {
     // Ignore empty URLs seen during start-up for some tests.
     if (url.empty()) return;
-    sys::Mutex::ScopedLock l(lock);
-    if (link) {
-        QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
-        link->setUrl(linkUrl(url));
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (link) {
+            QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
+            link->setUrl(removeSelf(url));
+            return;
+        }
     }
-    else initialize(url);        // Deferred initialization
+    initialize(url);        // Deferred initialization
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h Fri Jun 22 19:28:20 2012
@@ -53,7 +53,7 @@ class Backup
 
   private:
     bool isSelf(const Address& a) const;
-    Url linkUrl(const Url&) const;
+    Url removeSelf(const Url&) const;
     void initialize(const Url&);
 
     std::string logPrefix;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun 22 19:28:20 2012
@@ -170,8 +170,9 @@ Variant::Map asMapVoid(const Variant& va
 
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
-      logPrefix("Backup configuration: "), replicationTest(hb.getReplicationTest()),
-      haBroker(hb), broker(hb.getBroker()), link(l)
+      logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
+      haBroker(hb), broker(hb.getBroker()), link(l),
+      initialized(false)
 {}
 
 void BrokerReplicator::initialize() {
@@ -202,28 +203,32 @@ BrokerReplicator::~BrokerReplicator() { 
 
 // This is called in the connection IO thread when the bridge is started.
 void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler)
{
+    qpid::Address primary;
+    link->getRemoteAddress(primary);
+    string queueName = bridge.getQueueName();
+
+    QPID_LOG(info, logPrefix << (initialized ? "Connecting" : "Failing-over")
+             << " to primary " << primary
+             << " status:" << printable(haBroker.getStatus()));
+    initialized = true;
 
     switch (haBroker.getStatus()) {
       case JOINING:
         haBroker.setStatus(CATCHUP);
+        break;
       case CATCHUP:
-        // FIXME aconway 2012-04-27: distinguish catchup case, below.
         break;
       case READY:
-        // FIXME aconway 2012-04-27: distinguish ready case, reconnect to other backup.
         break;
       case RECOVERING:
       case ACTIVE:
-        // FIXME aconway 2012-04-27: link is connected to self!
-        // Promotion should close the link before allowing connections.
+        assert(0); // Primary does not reconnect.
         return;
-        break;
       case STANDALONE:
         return;
     }
 
     framing::AMQP_ServerProxy peer(sessionHandler.out);
-    string queueName = bridge.getQueueName();
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
 
     //declare and bind an event queue
@@ -243,9 +248,9 @@ void BrokerReplicator::initializeBridge(
     sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
-    qpid::Address primary;
-    link->getRemoteAddress(primary);
-    QPID_LOG(info, logPrefix << "Connected to " << primary << "(" <<
queueName << ")");
+
+    QPID_LOG(debug, logPrefix << "Connected to primary " << primary
+             << "(" << queueName << ")" << " status:" << printable(haBroker.getStatus()));
 }
 
 void BrokerReplicator::route(Deliverable& msg) {
@@ -571,9 +576,4 @@ bool BrokerReplicator::isBound(boost::sh
 
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
-void BrokerReplicator::ready() {
-    assert(haBroker.getStatus() == CATCHUP);
-    haBroker.setStatus(READY);
-}
-
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Jun 22 19:28:20 2012
@@ -94,13 +94,13 @@ class BrokerReplicator : public broker::
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
-    void ready();
 
     std::string logPrefix;
     ReplicationTest replicationTest;
     HaBroker& haBroker;
     broker::Broker& broker;
     boost::shared_ptr<broker::Link> link;
+    bool initialized;
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jun 22 19:28:20 2012
@@ -110,8 +110,11 @@ HaBroker::~HaBroker() {
 }
 
 void HaBroker::recover(Mutex::ScopedLock&) {
+    // No longer replicating, close link. Note: link must be closed before we
+    // setStatus(RECOVERING) as that will remove our broker info from the
+    // outgoing link properties so we won't recognize self-connects.
+    backup.reset();
     setStatus(RECOVERING);
-    backup.reset();                    // No longer replicating, close link.
     BrokerInfo::Set backups = membership.otherBackups();
     membership.reset(brokerInfo);
     // Drop the lock, new Primary may call back on activate.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jun 22 19:28:20 2012
@@ -56,7 +56,7 @@ class ConnectionObserver;
 class Primary;
 
 /**
- * HA state and actions associated with a broker.
+ * HA state and actions associated with a HA broker. Holds all the management info.
  *
  * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
  */

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jun 22 19:28:20 2012
@@ -70,13 +70,13 @@ Primary::Primary(HaBroker& hb, const Bro
     assert(instance == 0);
     instance = this;            // Let queue replicators find us.
     if (expect.empty()) {
-        QPID_LOG(debug, logPrefix << "Expected backups: none");
+        QPID_LOG(debug, logPrefix << "Promoted, no expected backups");
     }
     else {
         // NOTE: RemoteBackups must be created before we set the ConfigurationObserver
         // orr ConnectionObserver so that there is no client activity while
         // the QueueGuards are created.
-        QPID_LOG(debug, logPrefix << "Expected backups: " << expect);
+        QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect);
         for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i)
{
             bool guard = true;  // Create queue guards immediately for expected backups.
             boost::shared_ptr<RemoteBackup> backup(

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1352999&r1=1352998&r2=1352999&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jun 22 19:28:20 2012
@@ -87,7 +87,7 @@ class HaBroker(Broker):
             # Ignore ConnectionError, the broker may not be up yet.
             try: return self.ha_status() == status;
             except ConnectionError: return False
-        assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self, self.ha_status(),
status)
+        assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
 
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message