qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1348113 - in /qpid/trunk/qpid/cpp/src: ./ qpid/ha/ tests/
Date Fri, 08 Jun 2012 15:24:19 GMT
Author: aconway
Date: Fri Jun  8 15:24:18 2012
New Revision: 1348113

URL: http://svn.apache.org/viewvc?rev=1348113&view=rev
Log:
QPID-3603: HA primary sends membership updates to backup brokers.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
      - copied, changed from r1348090, qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
Modified:
    qpid/trunk/qpid/cpp/src/ha.mk
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ha.mk?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ha.mk (original)
+++ qpid/trunk/qpid/cpp/src/ha.mk Fri Jun  8 15:24:18 2012
@@ -39,6 +39,8 @@ ha_la_SOURCES =					\
   qpid/ha/HaPlugin.cpp				\
   qpid/ha/LogPrefix.cpp				\
   qpid/ha/LogPrefix.h				\
+  qpid/ha/Membership.cpp			\
+  qpid/ha/Membership.h				\
   qpid/ha/Primary.cpp				\
   qpid/ha/Primary.h				\
   qpid/ha/QueueReplicator.cpp			\

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Jun  8 15:24:18 2012
@@ -75,7 +75,7 @@ void Backup::initialize(const Url& broke
     sys::Mutex::ScopedLock l(lock);
     Url url = linkUrl(brokers);
     string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
-    framing::Uuid uuid(true);
+    types::Uuid uuid(true);
     // Declare the link
     std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
         broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Fri Jun  8 15:24:18 2012
@@ -20,8 +20,11 @@
  */
 
 #include "BrokerInfo.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include <iostream>
 
 
@@ -35,23 +38,46 @@ std::string PORT="port";
 std::string STATUS="status";
 }
 
-using framing::Uuid;
+using types::Uuid;
+using types::Variant;
 using framing::FieldTable;
 
 FieldTable BrokerInfo::asFieldTable() const {
+    Variant::Map m = asMap();
     FieldTable ft;
-    ft.setString(SYSTEM_ID, systemId.str());
-    ft.setString(HOST_NAME, hostName);
-    ft.setInt(PORT, port);
-    ft.setInt(STATUS, status);
+    amqp_0_10::translate(m, ft);
     return ft;
 }
 
+Variant::Map BrokerInfo::asMap() const {
+    Variant::Map m;
+    m[SYSTEM_ID] = systemId;
+    m[HOST_NAME] = hostName;
+    m[PORT] = port;
+    m[STATUS] = status;
+    return m;
+}
+
 void BrokerInfo::assign(const FieldTable& ft) {
-    systemId = Uuid(ft.getAsString(SYSTEM_ID));
-    hostName = ft.getAsString(HOST_NAME);
-    port = ft.getAsInt(PORT);
-    status = BrokerStatus(ft.getAsInt(STATUS));
+    Variant::Map m;
+    amqp_0_10::translate(ft, m);
+    assign(m);
+}
+
+namespace {
+const Variant& get(const Variant::Map& m, const std::string& k) {
+    Variant::Map::const_iterator i = m.find(k);
+    if (i == m.end()) throw Exception(
+        QPID_MSG("Missing field '" << k << "' in broker information"));
+    return i->second;
+}
+}
+
+void BrokerInfo::assign(const Variant::Map& m) {
+    systemId = get(m, SYSTEM_ID).asUuid();
+    hostName = get(m, HOST_NAME).asString();
+    port = get(m, PORT).asUint16();
+    status = BrokerStatus(get(m, STATUS).asUint8());
 }
 
 std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Fri Jun  8 15:24:18 2012
@@ -24,8 +24,9 @@
 
 #include "Enum.h"
 #include "qpid/Url.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
 #include <string>
 #include <iosfwd>
 
@@ -38,24 +39,29 @@ namespace ha {
 class BrokerInfo
 {
   public:
-    BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& id)
:
+    BrokerInfo() {}
+    BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
         hostName(host), port(port_), systemId(id) {}
-
     BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
-    framing::FieldTable asFieldTable() const;
-    void assign(const framing::FieldTable&);
+    BrokerInfo(const types::Variant::Map& m) { assign(m); }
 
-    framing::Uuid getSystemId() const { return systemId; }
+    types::Uuid getSystemId() const { return systemId; }
     std::string getHostName() const { return hostName; }
     BrokerStatus getStatus() const { return status; }
      uint16_t getPort() const { return port; }
 
     void setStatus(BrokerStatus s)  { status = s; }
 
+    framing::FieldTable asFieldTable() const;
+    types::Variant::Map asMap() const;
+
+    void assign(const framing::FieldTable&);
+    void assign(const types::Variant::Map&);
+
   private:
     std::string hostName;
     uint16_t port;
-    framing::Uuid systemId;
+    types::Uuid systemId;
     BrokerStatus status;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun  8 15:24:18 2012
@@ -13,7 +13,7 @@
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+* KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  *
@@ -37,6 +37,7 @@
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
 #include <algorithm>
 #include <sstream>
 #include <assert.h>
@@ -51,6 +52,7 @@ using qmf::org::apache::qpid::broker::Ev
 using qmf::org::apache::qpid::broker::EventQueueDeclare;
 using qmf::org::apache::qpid::broker::EventQueueDelete;
 using qmf::org::apache::qpid::broker::EventSubscribe;
+using qmf::org::apache::qpid::ha::EventMembersUpdate;
 using namespace framing;
 using std::string;
 using types::Variant;
@@ -93,7 +95,8 @@ const string TYPE("type");
 const string USER("user");
 const string HA_BROKER("habroker");
 
-const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
 const string QMF2("qmf2");
 const string QMF_CONTENT("qmf.content");
 const string QMF_DEFAULT_TOPIC("qmf.default.topic");
@@ -109,6 +112,7 @@ const string ORG_APACHE_QPID_HA("org.apa
 const string QMF_DEFAULT_DIRECT("qmf.default.direct");
 const string _QUERY_REQUEST("_query_request");
 const string BROKER("broker");
+const string MEMBERS("members");
 
 bool isQMFv2(const Message& message) {
     const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
@@ -169,7 +173,7 @@ BrokerReplicator::BrokerReplicator(HaBro
       logPrefix(hb),
       haBroker(hb), broker(hb.getBroker()), link(l)
 {
-    framing::Uuid uuid(true);
+    types::Uuid uuid(true);
     const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
     broker.getLinks().declare(
         name,               // name for bridge
@@ -221,7 +225,8 @@ void BrokerReplicator::initializeBridge(
     FieldTable declareArgs;
     declareArgs.setString(QPID_REPLICATE, printable(NONE).str());
     peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
-    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER,
FieldTable());
+    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
+    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
     //subscribe to the queue
     peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
     peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
@@ -256,6 +261,7 @@ void BrokerReplicator::route(Deliverable
                 else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
                 else if (match<EventBind>(schema)) doEventBind(values);
                 else if (match<EventUnbind>(schema)) doEventUnbind(values);
+                else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -423,6 +429,11 @@ void BrokerReplicator::doEventUnbind(Var
     }
 }
 
+void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
+    Variant::List members = values[MEMBERS].asList();
+    haBroker.getMembership().assign(members);
+}
+
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!isReplicated(values[ARGUMENTS].asMap(),
@@ -517,15 +528,14 @@ const string REPLICATE_DEFAULT="replicat
 void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
     try {
         ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
-        ReplicateLevel primary = haBroker.replicateLevel(values[REPLICATE_DEFAULT].asString());
-        if (mine != primary) {
-            QPID_LOG(critical, logPrefix << "Replicate default on backup (" <<
mine
-                     << ") does not match primary (" <<  primary << ")");
-            haBroker.shutdown();
-        }
+        ReplicateLevel primary = haBroker.replicateLevel(
+            values[REPLICATE_DEFAULT].asString());
+        if (mine != primary)
+            throw Exception(QPID_MSG("Replicate default on backup (" << mine
+                                     << ") does not match primary (" <<  primary
<< ")"));
+        haBroker.getMembership().assign(values[MEMBERS].asList());
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Invalid replicate default from primary: "
-                 << e.what());
+        QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what());
         haBroker.shutdown();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Jun  8 15:24:18 2012
@@ -81,6 +81,7 @@ class BrokerReplicator : public broker::
     void doEventExchangeDelete(types::Variant::Map& values);
     void doEventBind(types::Variant::Map&);
     void doEventUnbind(types::Variant::Map&);
+    void doEventMembersUpdate(types::Variant::Map&);
 
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp Fri Jun  8 15:24:18 2012
@@ -21,6 +21,7 @@
 
 #include "ConnectionExcluder.h"
 #include "BrokerInfo.h"
+#include "HaBroker.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Connection.h"
 #include <boost/function.hpp>
@@ -29,8 +30,19 @@
 namespace qpid {
 namespace ha {
 
-ConnectionExcluder::ConnectionExcluder(const LogPrefix& lp, const framing::Uuid&
uuid)
-    : logPrefix(lp), backupAllowed(false), self(uuid) {}
+ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid)
+    : haBroker(hb), logPrefix(hb), self(uuid) {}
+
+namespace {
+bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+    framing::FieldTable ft;
+    if (connection.getClientProperties().getTable(ConnectionExcluder::BACKUP_TAG, ft)) {
+        info = BrokerInfo(ft);
+        return true;
+    }
+    return false;
+}
+}
 
 void ConnectionExcluder::opened(broker::Connection& connection) {
     if (connection.isLink()) return; // Allow all outgoing links
@@ -39,23 +51,37 @@ void ConnectionExcluder::opened(broker::
                  << connection.getMgmtId());
         return;
     }
-    framing::FieldTable ft;
-    if (connection.getClientProperties().getTable(BACKUP_TAG, ft)) {
-        BrokerInfo info(ft);
+    BrokerStatus status = haBroker.getStatus();
+    if (isBackup(status)) reject(connection);
+    BrokerInfo info;            // Get info about a connecting backup.
+    if (getBrokerInfo(connection, info)) {
         if (info.getSystemId() == self) {
-            QPID_LOG(debug, logPrefix << "Self connection rejected");
+            QPID_LOG(debug, logPrefix << "Rejected self connection");
+            reject(connection);
         }
         else {
-            QPID_LOG(debug, logPrefix << "Backup connection " << info <<
-                     (backupAllowed ? " allowed" : " rejected"));
-            if (backupAllowed) return;
+            QPID_LOG(debug, logPrefix << "Allowed backup connection " << info);
+            haBroker.getMembership().add(info);
+            return;
         }
     }
-    // Abort the connection.
+    else // This is a client connection.
+        if (status == RECOVERING) reject(connection); // FIXME aconway 2012-05-29: allow
clients in recovery
+}
+
+void ConnectionExcluder::reject(broker::Connection& connection) {
     throw Exception(
         QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId()));
 }
 
+void ConnectionExcluder::closed(broker::Connection& connection) {
+    BrokerInfo info;
+    BrokerStatus status = haBroker.getStatus();
+    if (isBackup(status)) return; // Don't mess with the map received from primary.
+    if (getBrokerInfo(connection, info))
+        haBroker.getMembership().remove(info.getSystemId());
+}
+
 const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
 const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup";
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Fri Jun  8 15:24:18 2012
@@ -25,6 +25,7 @@
 #include "LogPrefix.h"
 #include "qpid/broker/ConnectionObserver.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
 #include <boost/function.hpp>
 
 namespace qpid {
@@ -46,17 +47,19 @@ class ConnectionExcluder : public broker
     static const std::string ADMIN_TAG;
     static const std::string BACKUP_TAG;
 
-    ConnectionExcluder(const LogPrefix&, const framing::Uuid& self);
+    ConnectionExcluder(HaBroker&, const types::Uuid& self);
 
     void opened(broker::Connection& connection);
+    void closed(broker::Connection& connection);
 
-    void setBackupAllowed(bool set) { backupAllowed = set; }
-    bool isBackupAllowed() const { return backupAllowed; }
+    void setStatus(BrokerStatus);
 
   private:
+    void reject(broker::Connection&);
+
+    HaBroker& haBroker;
     LogPrefix logPrefix;
-    bool backupAllowed;
-    framing::Uuid self;
+    types::Uuid self;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jun  8 15:24:18 2012
@@ -33,11 +33,14 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/Uuid.h"
 #include "qmf/org/apache/qpid/ha/Package.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
@@ -50,14 +53,15 @@ using namespace std;
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : logPrefix(status),
       broker(b),
+      systemId(broker.getSystem()->getSystemId().data()),
       settings(s),
       mgmtObject(0),
       status(STANDALONE),
-      excluder(new ConnectionExcluder(logPrefix, broker.getSystem()->getSystemId())),
+      excluder(new ConnectionExcluder(*this, systemId)),
       brokerInfo(broker.getSystem()->getNodeName(),
                  // TODO aconway 2012-05-24: other transports?
-                 broker.getPort(broker::Broker::TCP_TRANSPORT),
-                 broker.getSystem()->getSystemId())
+                 broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
+      membership(logPrefix, boost::bind(&HaBroker::membershipUpdate, this, _1))
 
 {
     // Set up the management object.
@@ -67,6 +71,7 @@ HaBroker::HaBroker(broker::Broker& b, co
     _qmf::Package  packageInit(ma);
     mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
     mgmtObject->set_replicateDefault(settings.replicateDefault.str());
+    mgmtObject->set_systemId(systemId);
     ma->addObject(mgmtObject);
 
     // Register a factory for replicating subscriptions.
@@ -92,11 +97,14 @@ HaBroker::HaBroker(broker::Broker& b, co
     QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo);
 }
 
-HaBroker::~HaBroker() {}
+HaBroker::~HaBroker() {
+    broker.getConnectionObservers().remove(excluder);
+}
 
 void HaBroker::recover(sys::Mutex::ScopedLock&) {
     setStatus(RECOVERING);
     backup.reset();                    // No longer replicating, close link.
+    membership.reset(brokerInfo);
     primary.reset(new Primary(*this)); // Starts primary-ready check.
 }
 
@@ -107,9 +115,11 @@ void HaBroker::activate() {
 }
 
 void HaBroker::activate(sys::Mutex::ScopedLock&) {
+    BrokerStatus oldStatus = status;
     setStatus(ACTIVE);
+    if (oldStatus != RECOVERING)   // Already set membership
+        membership.reset(brokerInfo);
     backup.reset();                    // No longer replicating, close link.
-    broker.getConnectionObservers().remove(excluder); // This allows client connections.
 }
 
 ReplicateLevel HaBroker::replicateLevel(const std::string& str) {
@@ -173,7 +183,7 @@ Manageable::status_t HaBroker::Managemen
           boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
           Url url(bq_args.i_broker);
           string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
-          framing::Uuid uuid(true);
+          types::Uuid uuid(true);
           std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
               broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
               url[0].host, url[0].port, protocol,
@@ -282,6 +292,12 @@ void HaBroker::statusChanged(sys::Mutex:
     setLinkProperties(l);
 }
 
+void HaBroker::membershipUpdate(const types::Variant::List& brokers) {
+    QPID_LOG(debug, logPrefix << "Membership update: " << brokers);
+    mgmtObject->set_members(brokers);
+    broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
+}
+
 void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
     framing::FieldTable linkProperties = broker.getLinkClientProperties();
     if (isBackup(status)) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jun  8 15:24:18 2012
@@ -23,6 +23,7 @@
  */
 
 #include "BrokerInfo.h"
+#include "Membership.h"
 #include "Enum.h"
 #include "LogPrefix.h"
 #include "Settings.h"
@@ -91,6 +92,8 @@ class HaBroker : public management::Mana
     boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; }
 
     const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
+    Membership& getMembership() { return membership; }
+    void membershipUpdate(const types::Variant::List&);
 
   private:
     void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
@@ -110,6 +113,7 @@ class HaBroker : public management::Mana
 
     LogPrefix logPrefix;
     broker::Broker& broker;
+    types::Uuid systemId;
     const Settings settings;
 
     mutable sys::Mutex lock;
@@ -123,6 +127,7 @@ class HaBroker : public management::Mana
     QueueNames activeBackups;
     boost::shared_ptr<ConnectionExcluder> excluder;
     BrokerInfo brokerInfo;
+    Membership membership;
 };
 }} // namespace qpid::ha
 

Added: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1348113&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jun  8 15:24:18 2012
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Membership.h"
+
+namespace qpid {
+namespace ha {
+
+
+void Membership::reset(const BrokerInfo& b) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        brokers.clear();
+        brokers[b.getSystemId()] = b;
+    }
+    update();
+}
+
+void Membership::add(const BrokerInfo& b) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        brokers[b.getSystemId()] = b;
+    }
+    update();
+}
+
+
+void Membership::remove(const types::Uuid& id) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        BrokerMap::iterator i = brokers.find(id);
+        if (i != brokers.end())
+            brokers.erase(i);
+    }
+    update();
+}
+
+bool Membership::contains(const types::Uuid& id) {
+    sys::Mutex::ScopedLock l(lock);
+    return brokers.find(id) != brokers.end();
+}
+
+void Membership::assign(const types::Variant::List& list) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        brokers.clear();
+        for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i)
{
+            BrokerInfo b(i->asMap());
+            brokers[b.getSystemId()] = b;
+        }
+    }
+    update();
+}
+
+types::Variant::List Membership::asList() const {
+    sys::Mutex::ScopedLock l(lock);
+    types::Variant::List list;
+    for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+        list.push_back(i->second.asMap());
+    return list;
+}
+
+void Membership::update() {
+    if (updateCallback) {
+        types::Variant::List list;
+        for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+            list.push_back(i->second.asMap());
+        updateCallback(list);
+    }
+}
+
+}} // namespace qpid::ha

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (from r1348090, qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h&p1=qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h&r1=1348090&r2=1348113&rev=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionExcluder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Fri Jun  8 15:24:18 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_CONNECTIONEXCLUDER_H
-#define QPID_HA_CONNECTIONEXCLUDER_H
+#ifndef QPID_HA_MEMBERSHIP_H
+#define QPID_HA_MEMBERSHIP_H
 
 /*
  *
@@ -22,43 +22,44 @@
  *
  */
 
+#include "BrokerInfo.h"
 #include "LogPrefix.h"
-#include "qpid/broker/ConnectionObserver.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
 #include <boost/function.hpp>
-
+#include <set>
+#include <vector>
 namespace qpid {
-
-namespace broker {
-class Connection;
-}
-
 namespace ha {
 
 /**
- * Exclude normal connections to a backup broker.
- * Admin connections are identified by a special flag in client-properties
- * during connection negotiation.
+ * Keep track of the brokers in the membership.
+ * THREAD SAFE: updated in arbitrary connection threads.
  */
-class ConnectionExcluder : public broker::ConnectionObserver
+class Membership
 {
   public:
-    static const std::string ADMIN_TAG;
-    static const std::string BACKUP_TAG;
+    Membership(LogPrefix lp, boost::function<void(const types::Variant::List&)>
updateFn)
+        : logPrefix(lp), updateCallback(updateFn) {}
 
-    ConnectionExcluder(const LogPrefix&, const framing::Uuid& self);
+    void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+    void add(const BrokerInfo& b);
+    void remove(const types::Uuid& id);
+    bool contains(const types::Uuid& id);
 
-    void opened(broker::Connection& connection);
-
-    void setBackupAllowed(bool set) { backupAllowed = set; }
-    bool isBackupAllowed() const { return backupAllowed; }
+    void assign(const types::Variant::List&);
+    types::Variant::List asList() const;
 
   private:
+    typedef std::map<types::Uuid, BrokerInfo> BrokerMap;
+    void update();
+
+    mutable sys::Mutex lock;
     LogPrefix logPrefix;
-    bool backupAllowed;
-    framing::Uuid self;
+    BrokerMap brokers;
+    boost::function<void(const types::Variant::List&)> updateCallback;
 };
-
 }} // namespace qpid::ha
 
-#endif  /*!QPID_HA_CONNECTIONEXCLUDER_H*/
+#endif  /*!QPID_HA_MEMBERSHIP_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jun  8 15:24:18 2012
@@ -57,8 +57,6 @@ Primary::Primary(HaBroker& b) :
         else {
             QPID_LOG(debug, logPrefix << "Waiting for  " << expected
                      << " backups on " << queues.size() << " queues");
-            // Allow backups to connect.
-            haBroker.getExcluder()->setBackupAllowed(true);
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml Fri Jun  8 15:24:18 2012
@@ -34,9 +34,12 @@
     <property name="expectedBackups" type="uint16"
 	      desc="Number of HA backup brokers expected."/>
 
-    <property
-	name="replicateDefault" type="sstr"
-	desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
+    <property name="replicateDefault" type="sstr"
+	      desc="Replication for queues/exchanges with no qpid.replicate argument"/>
+
+    <property name="members" type="list" desc="List of brokers in the cluster"/>
+
+    <property name="systemId" type="uuid" desc="Identifies the system."/>
 
     <method name="promote" desc="Promote a backup broker to primary."/>
 
@@ -58,4 +61,10 @@
     </method>
   </class>
 
+  <eventArguments>
+    <arg name="members" type="list" desc="List of broker information maps"/>
+  </eventArguments>
+
+  <event name="membersUpdate" sev="inform" args="members"/>
+
 </schema>

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1348113&r1=1348112&r2=1348113&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jun  8 15:24:18 2012
@@ -26,19 +26,24 @@ from brokertest import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG
 from qpidtoollibs import BrokerAgent
+from uuid import UUID
 
 log = getLogger(__name__)
 
-class QmfHaBroker(object):
+class QmfAgent(object):
+    """Access to a QMF broker agent."""
     def __init__(self, address):
-        self.connection = Connection.establish(
+        self._connection = Connection.establish(
             address, client_properties={"qpid.ha-admin":1})
-        self.qmf = BrokerAgent(self.connection)
-        self.ha_broker = self.qmf.getHaBroker()
-        if not self.ha_broker:
-            raise Exception("HA module is not loaded on broker at %s"%address)
+        self._agent = BrokerAgent(self._connection)
+        assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+    def __getattr__(self, name):
+        a = getattr(self._agent, name)
+        return a
 
 class HaBroker(Broker):
+    """Start a broker with HA enabled"""
     def __init__(self, test, args=[], brokers_url=None, ha_cluster=True,
                  ha_replicate="all", **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
@@ -58,6 +63,7 @@ class HaBroker(Broker):
         assert os.path.exists(self.qpid_config_path)
         getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
         self.qpid_ha_script=import_script(self.qpid_ha_path)
+        self._agent = None
 
     def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
 
@@ -65,7 +71,11 @@ class HaBroker(Broker):
     def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
     def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
     def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-    def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status
+    def agent(self):
+        if not self._agent: self._agent = QmfAgent(self.host_port())
+        return self._agent
+
+    def ha_status(self): self.agent().getHaBroker().status
 
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
@@ -641,6 +651,31 @@ class ReplicationTests(BrokerTest):
         self.failIf(i < 0)
         self.assertEqual(log.find("caught up", i), -1)
 
+    def test_broker_info(self):
+        """Check that broker information is correctly published via management"""
+        cluster = HaCluster(self, 3)
+
+        for broker in cluster:  # Make sure HA system-id matches broker's
+            qmf = broker.agent().getHaBroker()
+            self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+        cluster_ports = map(lambda b: b.port(), cluster)
+        cluster_ports.sort()
+        def ports(qmf):
+            qmf.update()
+            return sorted(map(lambda b: b["port"], qmf.members))
+        # Check that all brokers have the same membership as the cluster
+        for broker in cluster:
+            qmf = broker.agent().getHaBroker()
+            assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports,
ports(qmf))
+        # Add a new broker, check it is updated everywhere
+        b = cluster.start()
+        cluster_ports.append(b.port())
+        cluster_ports.sort()
+        for broker in cluster:
+            qmf = broker.agent().getHaBroker()
+            assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports,
ports(qmf))
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message