qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1243577 - in /qpid/branches/qpid-3603-2/qpid/cpp/src: qpid/broker/ qpid/ha/ tests/
Date Mon, 13 Feb 2012 16:17:53 GMT
Author: aconway
Date: Mon Feb 13 16:17:52 2012
New Revision: 1243577

URL: http://svn.apache.org/viewvc?rev=1243577&view=rev
Log:
QPID-3603: Simplified Link failover.

- Moved timer from  LinkRegistry to Link.
- Got rid of remapping code, simplified failover.
- Faster interval for maintenance intervals.
- Test for simple HA broker failover.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/RetryList.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/reliable_replication_test

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp Mon Feb 13 16:17:52 2012
@@ -23,6 +23,7 @@
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/sys/Timer.h"
 #include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
 #include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
 #include "boost/bind.hpp"
@@ -31,19 +32,35 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
 
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::UnauthorizedAccessException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
+namespace qpid {
+namespace broker {
+
+using framing::Buffer;
+using framing::FieldTable;
+using framing::UnauthorizedAccessException;
+using framing::connection::CLOSE_CODE_CONNECTION_FORCED;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
+using sys::Mutex;
 using std::stringstream;
 using std::string;
-namespace _qmf = qmf::org::apache::qpid::broker;
+namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+struct LinkTimerTask : public sys::TimerTask {
+    LinkTimerTask(Link& l, sys::Timer& t)
+        : TimerTask(/*FIXME*/100*sys::TIME_MSEC, "Link retry timer"), link(l), timer(t) {}
+
+    void fire() {
+        link.maintenanceVisit();  // FIXME aconway 2012-01-31:
+        setupNextFire();
+        timer.add(this);
+    }
+
+    Link& link;
+    sys::Timer& timer;
+};
 
 Link::Link(LinkRegistry*  _links,
            MessageStore*  _store,
@@ -67,7 +84,8 @@ Link::Link(LinkRegistry*  _links,
       reconnectNext(0),         // Index of next address for reconnecting in url.
       channelCounter(1),
       connection(0),
-      agent(0)
+      agent(0),
+      timerTask(new LinkTimerTask(*this, broker->getTimer()))
 {
     if (parent != 0 && broker != 0)
     {
@@ -80,10 +98,12 @@ Link::Link(LinkRegistry*  _links,
     }
     setStateLH(STATE_WAITING);
     startConnectionLH();
+    broker->getTimer().add(timerTask);
 }
 
 Link::~Link ()
 {
+    timerTask->cancel();
     if (state == STATE_OPERATIONAL && connection != 0)
         connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
 
@@ -121,7 +141,9 @@ void Link::startConnectionLH ()
         broker->connect (host, boost::lexical_cast<std::string>(port), transport,
                          boost::bind (&Link::closed, this, _1, _2));
         QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" <<
port);
-    } catch(std::exception& e) {
+    } catch(const std::exception& e) {
+        QPID_LOG(error, "Link connection to " << host << ":" << port <<
" failed: "
+                 << e.what());
         setStateLH(STATE_WAITING);
         if (!hideManagement())
             mgmtObject->set_lastError (e.what());
@@ -156,14 +178,16 @@ void Link::setUrl(const Url& u) {
 void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     assert(connection);
-    // Get default URL from known-hosts.
-    const std::vector<Url>& known = connection->getKnownHosts();
-    // Flatten vector of URLs into a single URL listing all addresses.
-    url.clear();
-    for(size_t i = 0; i < known.size(); ++i)
-        url.insert(url.end(), known[i].begin(), known[i].end());
-    reconnectNext = 0;
-    QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
+    // Get default URL from known-hosts if not already set
+    if (url.empty()) {
+        const std::vector<Url>& known = connection->getKnownHosts();
+        // Flatten vector of URLs into a single URL listing all addresses.
+        url.clear();
+        for(size_t i = 0; i < known.size(); ++i)
+            url.insert(url.end(), known[i].begin(), known[i].end());
+        reconnectNext = 0;
+        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
+    }
 }
 
 void Link::closed(int, std::string text)
@@ -176,7 +200,7 @@ void Link::closed(int, std::string text)
     if (state == STATE_OPERATIONAL) {
         stringstream addr;
         addr << host << ":" << port;
-        QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+        QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
         if (!hideManagement() && agent)
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
@@ -333,7 +357,7 @@ void Link::maintenanceVisit ()
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
     }
 
-void Link::reconnect(const qpid::Address& a)
+void Link::reconnect(const Address& a)
 {
     Mutex::ScopedLock mutex(lock);
     host = a.host;
@@ -347,13 +371,14 @@ void Link::reconnect(const qpid::Address
     }
 }
 
-bool Link::tryFailoverLH() {      // FIXME aconway 2012-01-30: lock held?
+bool Link::tryFailoverLH() {
     if (reconnectNext >= url.size()) reconnectNext = 0;
     if (url.empty()) return false;
     Address next = url[reconnectNext++];
     if (next.host != host || next.port != port || next.protocol != transport) {
         links->changeAddress(Address(transport, host, port), next);
-        QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+        QPID_LOG(debug, "Inter-broker link failing over to " << next.host <<
":" << next.port);
+        reconnect(next);
         return true;
     }
     return false;
@@ -510,3 +535,5 @@ void Link::setPassive(bool passive)
         }
     }
 }
+
+}} // namespace qpid::broker

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h Mon Feb 13 16:17:52 2012
@@ -35,115 +35,122 @@
 #include <boost/ptr_container/ptr_vector.hpp>
 
 namespace qpid {
-    namespace broker {
 
-        class LinkRegistry;
-        class Broker;
-        class Connection;
-
-        class Link : public PersistableConfig, public management::Manageable {
-        private:
-            sys::Mutex          lock;
-            LinkRegistry*       links;
-            MessageStore*       store;
-            std::string        host;
-            uint16_t      port;
-            std::string        transport;
-            bool          durable;
-            std::string        authMechanism;
-            std::string        username;
-            std::string        password;
-            mutable uint64_t    persistenceId;
-            qmf::org::apache::qpid::broker::Link* mgmtObject;
-            Broker* broker;
-            int     state;
-            uint32_t visitCount;
-            uint32_t currentInterval;
-            bool     closing;
-            Url      url;       // URL can contain many addresses.
-            size_t   reconnectNext; // Index for next re-connect attempt
-
-            typedef std::vector<Bridge::shared_ptr> Bridges;
-            Bridges created;   // Bridges pending creation
-            Bridges active;    // Bridges active
-            Bridges cancellations;    // Bridges pending cancellation
-            uint channelCounter;
-            Connection* connection;
-            management::ManagementAgent* agent;
-
-            static const int STATE_WAITING     = 1;
-            static const int STATE_CONNECTING  = 2;
-            static const int STATE_OPERATIONAL = 3;
-            static const int STATE_FAILED      = 4;
-            static const int STATE_CLOSED      = 5;
-            static const int STATE_PASSIVE     = 6;
-
-            static const uint32_t MAX_INTERVAL = 32;
-
-            void setStateLH (int newState);
-            void startConnectionLH();        // Start the IO Connection
-            void destroy();                  // Called when mgmt deletes this link
-            void ioThreadProcessing();       // Called on connection's IO thread by request
-            bool tryFailoverLH();            // Called during maintenance visit
-            bool hideManagement() const;
-
-        public:
-            typedef boost::shared_ptr<Link> shared_ptr;
-
-            Link(LinkRegistry* links,
-                 MessageStore* store,
-                 const std::string&       host,
-                 uint16_t      port,
-                 const std::string&       transport,
-                 bool          durable,
-                 const std::string&       authMechanism,
-                 const std::string&       username,
-                 const std::string&       password,
-                 Broker*       broker,
-                 management::Manageable* parent = 0);
-            virtual ~Link();
-
-            std::string getHost() { return host; }
-            uint16_t    getPort() { return port; }
-            std::string getTransport() { return transport; }
-
-            bool isDurable() { return durable; }
-            void maintenanceVisit ();
-            uint nextChannel();
-            void add(Bridge::shared_ptr);
-            void cancel(Bridge::shared_ptr);
-            void setUrl(const Url&); // Set URL for reconnection.
-
-            void established(); // Called when connection is create
-            void opened();      // Called when connection is open (after create)
-            void closed(int, std::string);   // Called when connection goes away
-            void setConnection(Connection*); // Set pointer to the AMQP Connection
-            void reconnect(const Address&); //called by LinkRegistry
-            void close();       // Close the link from within the broker.
-
-            std::string getAuthMechanism() { return authMechanism; }
-            std::string getUsername()      { return username; }
-            std::string getPassword()      { return password; }
-            Broker* getBroker()       { return broker; }
-
-            void notifyConnectionForced(const std::string text);
-            void setPassive(bool p);
-
-            // PersistableConfig:
-            void     setPersistenceId(uint64_t id) const;
-            uint64_t getPersistenceId() const { return persistenceId; }
-            uint32_t encodedSize() const;
-            void     encode(framing::Buffer& buffer) const;
-            const std::string& getName() const;
-
-            static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer&
buffer);
-
-            // Manageable entry points
-            management::ManagementObject*    GetManagementObject(void) const;
-            management::Manageable::status_t ManagementMethod(uint32_t, management::Args&,
std::string&);
+namespace sys {
+class TimerTask;
+}
+
+namespace broker {
 
-        };
-    }
+class LinkRegistry;
+class Broker;
+class Connection;
+
+class Link : public PersistableConfig, public management::Manageable {
+  private:
+    sys::Mutex          lock;
+    LinkRegistry*       links;
+    MessageStore*       store;
+    std::string        host;
+    uint16_t      port;
+    std::string        transport;
+    bool          durable;
+    std::string        authMechanism;
+    std::string        username;
+    std::string        password;
+    mutable uint64_t    persistenceId;
+    qmf::org::apache::qpid::broker::Link* mgmtObject;
+    Broker* broker;
+    int     state;
+    uint32_t visitCount;
+    uint32_t currentInterval;
+    bool     closing;
+    Url      url;       // URL can contain many addresses.
+    size_t   reconnectNext; // Index for next re-connect attempt
+
+    typedef std::vector<Bridge::shared_ptr> Bridges;
+    Bridges created;   // Bridges pending creation
+    Bridges active;    // Bridges active
+    Bridges cancellations;    // Bridges pending cancellation
+    uint channelCounter;
+    Connection* connection;
+    management::ManagementAgent* agent;
+
+    boost::intrusive_ptr<sys::TimerTask> timerTask;
+
+    static const int STATE_WAITING     = 1;
+    static const int STATE_CONNECTING  = 2;
+    static const int STATE_OPERATIONAL = 3;
+    static const int STATE_FAILED      = 4;
+    static const int STATE_CLOSED      = 5;
+    static const int STATE_PASSIVE     = 6;
+
+    static const uint32_t MAX_INTERVAL = 32;
+
+    void setStateLH (int newState);
+    void startConnectionLH();        // Start the IO Connection
+    void destroy();                  // Called when mgmt deletes this link
+    void ioThreadProcessing();       // Called on connection's IO thread by request
+    bool tryFailoverLH();            // Called during maintenance visit
+    bool hideManagement() const;
+
+  public:
+    typedef boost::shared_ptr<Link> shared_ptr;
+
+    Link(LinkRegistry* links,
+         MessageStore* store,
+         const std::string&       host,
+         uint16_t      port,
+         const std::string&       transport,
+         bool          durable,
+         const std::string&       authMechanism,
+         const std::string&       username,
+         const std::string&       password,
+         Broker*       broker,
+         management::Manageable* parent = 0);
+    virtual ~Link();
+
+    std::string getHost() { return host; }
+    uint16_t    getPort() { return port; }
+    std::string getTransport() { return transport; }
+
+    bool isDurable() { return durable; }
+    void maintenanceVisit ();
+    uint nextChannel();
+    void add(Bridge::shared_ptr);
+    void cancel(Bridge::shared_ptr);
+    void setUrl(const Url&); // Set URL for reconnection.
+
+    void established(); // Called when connection is create
+    void opened();      // Called when connection is open (after create)
+    void closed(int, std::string);   // Called when connection goes away
+    void setConnection(Connection*); // Set pointer to the AMQP Connection
+    void reconnect(const Address&); //called by LinkRegistry
+    void close();       // Close the link from within the broker.
+
+    std::string getAuthMechanism() { return authMechanism; }
+    std::string getUsername()      { return username; }
+    std::string getPassword()      { return password; }
+    Broker* getBroker()       { return broker; }
+
+    void notifyConnectionForced(const std::string text);
+    void setPassive(bool p);
+
+    // PersistableConfig:
+    void     setPersistenceId(uint64_t id) const;
+    uint64_t getPersistenceId() const { return persistenceId; }
+    uint32_t encodedSize() const;
+    void     encode(framing::Buffer& buffer) const;
+    const std::string& getName() const;
+
+    static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+    // Manageable entry points
+    management::ManagementObject*    GetManagementObject(void) const;
+    management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
+
+};
+}
 }
 
 

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon Feb 13 16:17:52
2012
@@ -35,15 +35,12 @@ using boost::format;
 using boost::str;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
-#define LINK_MAINT_INTERVAL 2
-
 // TODO: This constructor is only used by the store unit tests -
 // That probably indicates that LinkRegistry isn't correctly
-// factored: The persistence element and maintenance element
-// should be factored separately
+// factored: The persistence element should be factored separately
 LinkRegistry::LinkRegistry () :
-    broker(0), timer(0),
-    parent(0), store(0), passive(false), passiveChanged(false),
+    broker(0),
+    parent(0), store(0), passive(false),
     realm("")
 {
 }
@@ -60,79 +57,32 @@ struct ConnectionObserverImpl : public C
 }
 
 LinkRegistry::LinkRegistry (Broker* _broker) :
-    broker(_broker), timer(&broker->getTimer()),
-    maintenanceTask(new Periodic(*this)),
-    parent(0), store(0), passive(false), passiveChanged(false),
+    broker(_broker),
+    parent(0), store(0), passive(false),
     realm(broker->getOptions().realm)
 {
-    timer->add(maintenanceTask);
     broker->getConnectionObservers().add(
         boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this)));
 }
 
-LinkRegistry::~LinkRegistry()
-{
-    // This test is only necessary if the default constructor above is present
-    if (maintenanceTask)
-        maintenanceTask->cancel();
-}
-
-LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
-    TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {}
-
-void LinkRegistry::Periodic::fire ()
-{
-    links.periodicMaintenance ();
-    setupNextFire();
-    links.timer->add(this);
-}
-
-void LinkRegistry::periodicMaintenance ()
-{
-    Mutex::ScopedLock locker(lock);
+LinkRegistry::~LinkRegistry() {}
 
-    linksToDestroy.clear();
-    bridgesToDestroy.clear();
-    if (passiveChanged) {
-        if (passive) { QPID_LOG(info, "Passivating links"); }
-        else { QPID_LOG(info, "Activating links"); }
-        for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
-            i->second->setPassive(passive);
-        }
-        passiveChanged = false;
-    }
-    for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
-        i->second->maintenanceVisit();
-    //now process any requests for re-addressing
-    for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
-        updateAddress(i->first, i->second);
-    reMappings.clear();
-}
 
 void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address&
newAddress)
 {
-    //done on periodic maintenance thread; hold changes in separate
-    //map to avoid modifying the link map that is iterated over
-    reMappings[createKey(oldAddress)] = newAddress;
-}
-
-bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address&
newAddress)
-{
+    Mutex::ScopedLock   locker(lock);
+    std::string oldKey = createKey(oldAddress);
     std::string newKey = createKey(newAddress);
     if (links.find(newKey) != links.end()) {
         QPID_LOG(error, "Attempted to update key from " << oldKey << " to " <<
newKey << " which is already in use");
-        return false;
     } else {
         LinkMap::iterator i = links.find(oldKey);
         if (i == links.end()) {
             QPID_LOG(error, "Attempted to update key from " << oldKey << " which
does not exist, to " << newKey);
-            return false;
         } else {
             links[newKey] = i->second;
-            i->second->reconnect(newAddress);
             links.erase(oldKey);
             QPID_LOG(info, "Updated link key from " << oldKey << " to " <<
newKey);
-            return true;
         }
     }
 }
@@ -230,7 +180,6 @@ void LinkRegistry::destroy(const string&
     {
         if (i->second->isDurable() && store)
             store->destroy(*(i->second));
-        linksToDestroy[key] = i->second;
         links.erase(i);
     }
 }
@@ -258,7 +207,6 @@ void LinkRegistry::destroy(const std::st
     l->second->cancel(b->second);
     if (b->second->isDurable())
         store->destroy(*(b->second));
-    bridgesToDestroy[bridgeKey] = b->second;
     bridges.erase(b);
 }
 
@@ -406,9 +354,12 @@ std::string LinkRegistry::createKey(cons
 void LinkRegistry::setPassive(bool p)
 {
     Mutex::ScopedLock locker(lock);
-    passiveChanged = p != passive;
     passive = p;
-    //will activate or passivate links on maintenance visit
+    if (passive) { QPID_LOG(info, "Passivating links"); }
+    else { QPID_LOG(info, "Activating links"); }
+    for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+        i->second->setPassive(passive);
+    }
 }
 
 void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f)
{

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h Mon Feb 13 16:17:52
2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,6 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/Address.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -40,40 +39,19 @@ namespace broker {
     class Broker;
     class Connection;
     class LinkRegistry {
-
-        // Declare a timer task to manage the establishment of link connections and the
-        // re-establishment of lost link connections.
-        struct Periodic : public sys::TimerTask
-        {
-            LinkRegistry& links;
-
-            Periodic(LinkRegistry& links);
-            virtual ~Periodic() {};
-            void fire();
-        };
-
         typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
         typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
-        typedef std::map<std::string, Address> AddressMap;
 
         LinkMap   links;
-        LinkMap   linksToDestroy;
         BridgeMap bridges;
-        BridgeMap bridgesToDestroy;
-        AddressMap reMappings;
 
         qpid::sys::Mutex lock;
         Broker* broker;
-        sys::Timer* timer;
-        boost::intrusive_ptr<qpid::sys::TimerTask> maintenanceTask;
         management::Manageable* parent;
         MessageStore* store;
         bool passive;
-        bool passiveChanged;
         std::string realm;
 
-        void periodicMaintenance ();
-        bool updateAddress(const std::string& oldKey, const Address& newAddress);
         boost::shared_ptr<Link> findLink(const std::string& key);
         static std::string createKey(const Address& address);
         static std::string createKey(const std::string& host, uint16_t port);
@@ -147,6 +125,7 @@ namespace broker {
          * Called by links failing over to new address
          */
         void changeAddress(const Address& oldAddress, const Address& newAddress);
+
         /**
          * Called to alter passive state. In passive state the links
          * and bridges managed by a link registry will be recorded and
@@ -155,7 +134,7 @@ namespace broker {
          */
         void setPassive(bool);
 
-        
+
         /** Iterate over each link in the registry. Used for cluster updates. */
         void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
         /** Iterate over each bridge in the registry. Used for cluster updates. */

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/RetryList.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/RetryList.h?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/RetryList.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/RetryList.h Mon Feb 13 16:17:52 2012
@@ -23,7 +23,6 @@
  */
 
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/Address.h"
 #include "qpid/Url.h"
 
 namespace qpid {
@@ -36,7 +35,7 @@ namespace broker {
 class RetryList
 {
   public:
-    QPID_BROKER_EXTERN RetryList();                
+    QPID_BROKER_EXTERN RetryList();
     QPID_BROKER_EXTERN void reset(const std::vector<Url>& urls);
     QPID_BROKER_EXTERN bool next(Address& address);
   private:

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Mon Feb 13 16:17:52 2012
@@ -46,32 +46,39 @@ using std::string;
 Backup::Backup(broker::Broker& b, const Settings& s) :
     broker(b), settings(s), excluder(new ConnectionExcluder())
 {
-    Url url(s.brokerUrl);
-    string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+    if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+}
 
+void Backup::initialize(const Url& url) {
+    QPID_LOG(notice, "Ha: Backup started: " << url);
+    string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
     // Declare the link
     std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
         url[0].host, url[0].port, protocol,
         false,              // durable
-        s.mechanism, s.username, s.password);
+        settings.mechanism, settings.username, settings.password);
     assert(result.second);  // FIXME aconway 2011-11-23: error handling
     link = result.first;
-    link->setUrl(Url(s.brokerUrl));
+    link->setUrl(url);
 
     replicator.reset(new BrokerReplicator(link));
     broker.getExchanges().registerExchange(replicator);
-
     broker.getConnectionObservers().add(excluder);
 }
 
 void Backup::setUrl(const Url& url) {
     sys::Mutex::ScopedLock l(lock);
-    link->setUrl(url);
+    if (!replicator.get())
+        initialize(url);
+    else {
+        QPID_LOG(info, "HA: Backup URL set to " << url);
+        link->setUrl(url);
+    }
 }
 
 Backup::~Backup() {
-    link->close();
-    broker.getExchanges().destroy(replicator->getName());
+    if (link) link->close();
+    if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
     broker.getConnectionObservers().remove(excluder); // Allows client connections.
 }
 

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Mon Feb 13 16:17:52 2012
@@ -52,6 +52,8 @@ class Backup
     void setUrl(const Url&);
 
   private:
+    void initialize(const Url&);
+
     sys::Mutex lock;
     broker::Broker& broker;
     Settings settings;

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Feb 13 16:17:52 2012
@@ -41,7 +41,9 @@ using namespace std;
 namespace {
 Url url(const std::string& s, const std::string& id) {
     try {
-        return Url(s);
+        // Allow the URL to be empty, used in tests that set the URL
+        // after starting broker
+        return s.empty() ? Url() : Url(s);
     } catch (const std::exception& e) {
         throw Exception(Msg() << "Invalid URL for " << id << ": '" <<
s << "'");
     }
@@ -55,15 +57,9 @@ const std::string BACKUP="backup";
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : broker(b),
       settings(s),
-      brokerUrl(url(s.brokerUrl, "ha-broker-url")),
-      clientUrl(s.clientUrl.empty() ? brokerUrl : url(s.clientUrl, "ha-client-url")),
       backup(new Backup(b, s)),
       mgmtObject(0)
 {
-    // Note all HA brokers start out in backup mode.
-    QPID_LOG(notice, "HA: Backup initialized: "
-             << " broker-url=" << brokerUrl
-             << " client-url=" << clientUrl);
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(
@@ -96,15 +92,17 @@ Manageable::status_t HaBroker::Managemen
           break;
       }
       case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
-          clientUrl = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).i_clientAddresses;
-          mgmtObject->set_clientAddresses(clientUrl.str());
+          string url = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args)
+              .i_clientAddresses;
+          mgmtObject->set_clientAddresses(url);
           // FIXME aconway 2012-01-30: upate status for new URL
           break;
       }
       case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
-          brokerUrl = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args).i_brokerAddresses;
-          mgmtObject->set_brokerAddresses(brokerUrl.str());
-          if (backup.get()) backup->setUrl(brokerUrl);
+          string url = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
+              .i_brokerAddresses;
+          mgmtObject->set_brokerAddresses(url);
+          if (backup.get()) backup->setUrl(Url(url));
           break;
       }
       default:

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h Mon Feb 13 16:17:52 2012
@@ -57,7 +57,6 @@ class HaBroker : public management::Mana
     sys::Mutex lock;
     broker::Broker& broker;
     Settings settings;
-    Url brokerUrl, clientUrl;
     std::auto_ptr<Backup> backup;
     qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
 };

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Mon Feb 13 16:17:52 2012
@@ -30,11 +30,12 @@ log = getLogger("qpid.ha-tests")
 class HaBroker(Broker):
     def __init__(self, test, args=[], broker_url=None, **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
-        Broker.__init__(self, test,
-                        args=["--load-module", BrokerTest.ha_lib,
-                              "--ha-enable=yes",
-                              "--ha-broker-url", broker_url ],
-                        **kwargs)
+        args=["--load-module", BrokerTest.ha_lib,
+              "--log-enable=debug+:ha::", # FIXME aconway 2012-01-31:
+              "--log-enable=debug+:Link",
+              "--ha-enable=yes"]
+        if broker_url: args += [ "--ha-broker-url", broker_url ]
+        Broker.__init__(self, test, args, **kwargs)
 
     def promote(self):
         assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
@@ -237,13 +238,13 @@ class ShortTests(BrokerTest):
 
     def test_failover(self):
         """Verify that backups rejects connections and that fail-over works in python client"""
-        getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
+        getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
         primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         # Check that backup rejects normal connections
         try:
-            backup.connect()
+            backup.connect().session()
             self.fail("Expected connection to backup to fail")
         except ConnectionError: pass
         # Check that admin connections are allowed to backup.
@@ -284,6 +285,24 @@ class ShortTests(BrokerTest):
         sender.stop()
         receiver.stop()
 
+    def test_backup_failover(self):
+        # FIXME aconway 2012-01-30: UNFINISHED
+        brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+                    for name in ["a","b","c"] ]
+        url = ",".join([b.host_port() for b in brokers])
+        for b in brokers: b.set_broker_url(url)
+        brokers[0].promote()
+        brokers[0].connect().session().sender(
+            "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+        for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+        # FIXME aconway 2012-01-30: failing - not using set URL?
+        brokers[0].kill()
+        brokers[2].promote()            # c must fail over to b.
+        brokers[2].connect().session().sender("q").send("b")
+        self.assert_browse_backup(brokers[1], "q", ["a","b"])
+        # FIXME aconway 2012-01-30: finish
+        for b in brokers[1:]: b.kill()
+
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/reliable_replication_test
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/reliable_replication_test?rev=1243577&r1=1243576&r2=1243577&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/reliable_replication_test (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/reliable_replication_test Mon Feb 13 16:17:52
2012
@@ -66,7 +66,6 @@ receive() {
 
 bounce_link() {
     $PYTHON_COMMANDS/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A"
-#    sleep 2
     $PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A"
replication replication
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message