qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1244023 - in /qpid/branches/qpid-3603-6/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ qpid/ha/ tests/
Date Tue, 14 Feb 2012 15:59:51 GMT
Author: aconway
Date: Tue Feb 14 15:59:49 2012
New Revision: 1244023

URL: http://svn.apache.org/viewvc?rev=1244023&view=rev
Log:
QPID-3603: Automatic wiring replication for HA.

Automatic replication of queues an exchanges. Bidnings TBD.

Get rid of long delay establishing connections:
- broker/Connection.cpp: requestIOProcessing() called before connection open saves work till
connection is open.
- broker/Link.cpp,LinkRegistry: Fix some const correctness errors.

Added:
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp   (with props)
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h
      - copied, changed from r1244022, qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py   (with props)
Modified:
    qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk Tue Feb 14 15:59:49 2012
@@ -26,6 +26,8 @@ ha_la_SOURCES =					\
   qpid/ha/HaPlugin.cpp				\
   qpid/ha/HaBroker.cpp				\
   qpid/ha/HaBroker.h				\
+  qpid/ha/Backup.cpp				\
+  qpid/ha/Backup.h				\
   qpid/ha/Settings.h
 
 ha_la_LIBADD = libqpidbroker.la

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp Tue Feb 14 15:59:49
2012
@@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boo
 {
     ScopedLock<Mutex> l(ioCallbackLock);
     ioCallbacks.push(callback);
-    out.activateOutput();
+    if (isOpen()) out.activateOutput();
 }
 
 Connection::~Connection()
@@ -156,11 +156,14 @@ Connection::~Connection()
 void Connection::received(framing::AMQFrame& frame) {
     // Received frame on connection so delay timeout
     restartTimeout();
+    bool wasOpen = isOpen();
     adapter.handle(frame);
     if (link) //i.e. we are acting as the client to another broker
         recordFromServer(frame);
     else
         recordFromClient(frame);
+    if (!wasOpen && isOpen())
+        doIoCallbacks(); // Do any callbacks registered before we opened.
 }
 
 void Connection::sent(const framing::AMQFrame& frame)
@@ -329,17 +332,16 @@ void Connection::closed(){ // Physically
 }
 
 void Connection::doIoCallbacks() {
-    {
-        ScopedLock<Mutex> l(ioCallbackLock);
-        // Although IO callbacks execute in the connection thread context, they are
-        // not cluster safe because they are queued for execution in non-IO threads.
-        ClusterUnsafeScope cus;
-        while (!ioCallbacks.empty()) {
-            boost::function0<void> cb = ioCallbacks.front();
-            ioCallbacks.pop();
-            ScopedUnlock<Mutex> ul(ioCallbackLock);
-            cb(); // Lend the IO thread for management processing
-        }
+    if (!isOpen()) return; // Don't process IO callbacks until we are open.
+    ScopedLock<Mutex> l(ioCallbackLock);
+    // Although IO callbacks execute in the connection thread context, they are
+    // not cluster safe because they are queued for execution in non-IO threads.
+    ClusterUnsafeScope cus;
+    while (!ioCallbacks.empty()) {
+        boost::function0<void> cb = ioCallbacks.front();
+        ioCallbacks.pop();
+        ScopedUnlock<Mutex> ul(ioCallbackLock);
+        cb(); // Lend the IO thread for management processing
     }
 }
 

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp Tue Feb 14 15:59:49 2012
@@ -47,13 +47,13 @@ namespace _qmf = qmf::org::apache::qpid:
 
 Link::Link(LinkRegistry*  _links,
            MessageStore*  _store,
-           string&        _host,
+           const string&        _host,
            uint16_t       _port,
-           string&        _transport,
+           const string&        _transport,
            bool           _durable,
-           string&        _authMechanism,
-           string&        _username,
-           string&        _password,
+           const string&        _authMechanism,
+           const string&        _username,
+           const string&        _password,
            Broker*        _broker,
            Manageable*    parent)
     : links(_links), store(_store), host(_host), port(_port),
@@ -79,6 +79,7 @@ Link::Link(LinkRegistry*  _links,
         }
     }
     setStateLH(STATE_WAITING);
+    startConnectionLH();
 }
 
 Link::~Link ()
@@ -213,28 +214,30 @@ void Link::add(Bridge::shared_ptr bridge
 {
     Mutex::ScopedLock mutex(lock);
     created.push_back (bridge);
+    if (connection)
+        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+
 }
 
 void Link::cancel(Bridge::shared_ptr bridge)
 {
-    {
-        Mutex::ScopedLock mutex(lock);
+    Mutex::ScopedLock mutex(lock);
 
-        for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
-            if ((*i).get() == bridge.get()) {
-                created.erase(i);
-                break;
-            }
+    for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+        if ((*i).get() == bridge.get()) {
+            created.erase(i);
+            break;
         }
-        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-            if ((*i).get() == bridge.get()) {
-                cancellations.push_back(bridge);
-                bridge->closed();
-                active.erase(i);
-                break;
-            }
+    }
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        if ((*i).get() == bridge.get()) {
+            cancellations.push_back(bridge);
+            bridge->closed();
+            active.erase(i);
+            break;
         }
     }
+
     if (!cancellations.empty()) {
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
     }
@@ -284,6 +287,8 @@ void Link::setConnection(Connection* c)
     Mutex::ScopedLock mutex(lock);
     connection = c;
     updateUrls = true;
+    // Process any IO tasks bridges added before setConnection.
+    connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 void Link::maintenanceVisit ()
@@ -313,7 +318,7 @@ void Link::maintenanceVisit ()
     }
     else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() ||
!cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
+    }
 
 void Link::reconnect(const qpid::Address& a)
 {

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h Tue Feb 14 15:59:49 2012
@@ -92,13 +92,13 @@ namespace qpid {
 
             Link(LinkRegistry* links,
                  MessageStore* store,
-                 std::string&       host,
+                 const std::string&       host,
                  uint16_t      port,
-                 std::string&       transport,
+                 const std::string&       transport,
                  bool          durable,
-                 std::string&       authMechanism,
-                 std::string&       username,
-                 std::string&       password,
+                 const std::string&       authMechanism,
+                 const std::string&       username,
+                 const std::string&       password,
                  Broker*       broker,
                  management::Manageable* parent = 0);
             virtual ~Link();

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Feb 14 15:59:49
2012
@@ -124,13 +124,13 @@ bool LinkRegistry::updateAddress(const s
     }
 }
 
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string&  host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string&  host,
                                                    uint16_t port,
-                                                   string&  transport,
+                                                   const string&  transport,
                                                    bool     durable,
-                                                   string&  authMechanism,
-                                                   string&  username,
-                                                   string&  password)
+                                                   const string&  authMechanism,
+                                                   const string&  username,
+                                                   const string&  password)
 
 {
     Mutex::ScopedLock   locker(lock);
@@ -151,16 +151,16 @@ pair<Link::shared_ptr, bool> LinkRegistr
     return std::pair<Link::shared_ptr, bool>(i->second, false);
 }
 
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
                                                      uint16_t     port,
                                                      bool         durable,
-                                                     std::string& src,
-                                                     std::string& dest,
-                                                     std::string& key,
+                                                     const std::string& src,
+                                                     const std::string& dest,
+                                                     const std::string& key,
                                                      bool         isQueue,
                                                      bool         isLocal,
-                                                     std::string& tag,
-                                                     std::string& excludes,
+                                                     const std::string& tag,
+                                                     const std::string& excludes,
                                                      bool         dynamic,
                                                      uint16_t     sync)
 {

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Feb 14 15:59:49
2012
@@ -84,24 +84,24 @@ namespace broker {
         ~LinkRegistry();
 
         std::pair<boost::shared_ptr<Link>, bool>
-            declare(std::string& host,
+            declare(const std::string& host,
                     uint16_t     port,
-                    std::string& transport,
+                    const std::string& transport,
                     bool         durable,
-                    std::string& authMechanism,
-                    std::string& username,
-                    std::string& password);
+                    const std::string& authMechanism,
+                    const std::string& username,
+                    const std::string& password);
         std::pair<Bridge::shared_ptr, bool>
-            declare(std::string& host,
+            declare(const std::string& host,
                     uint16_t     port,
                     bool         durable,
-                    std::string& src,
-                    std::string& dest,
-                    std::string& key,
+                    const std::string& src,
+                    const std::string& dest,
+                    const std::string& key,
                     bool         isQueue,
                     bool         isLocal,
-                    std::string& id,
-                    std::string& excludes,
+                    const std::string& id,
+                    const std::string& excludes,
                     bool         dynamic,
                     uint16_t     sync);
 

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp Tue Feb 14 15:59:49 2012
@@ -24,15 +24,19 @@
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
 
-using qmf::org::apache::qpid::broker::EventQueueDeclare;
-using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventBind;
 using qmf::org::apache::qpid::broker::EventExchangeDeclare;
 using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
 
 namespace qpid {
 namespace broker {
@@ -55,27 +59,47 @@ NodeClone::NodeClone(const std::string& 
 
 NodeClone::~NodeClone() {}
 
+namespace {
+const std::string QPID_REPLICATE("qpid.replicate");
+const std::string ALL("all");
+const std::string WIRING("wiring");
+
+bool isReplicated(const std::string& value) {
+    return value == ALL || value == WIRING;
+}
+bool isReplicated(const framing::FieldTable& f) {
+    return f.isSet(QPID_REPLICATE) && isReplicated(f.getAsString(QPID_REPLICATE));
+}
+bool isReplicated(const types::Variant::Map& m) {
+    types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+    return i != m.end() && isReplicated(i->second.asString());
+}
+
+}
+
 void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable*
headers)
 {
     if (isQMFv2(msg.getMessage()) && headers) {
-        if (headers->getAsString("qmf.content") == "_event") {
-            //decode as list
+        if (headers->getAsString("qmf.content") == "_event") { //decode as list
             std::string content = msg.getMessage().getFrames().getContent();
             qpid::types::Variant::List list;
             qpid::amqp_0_10::ListCodec::decode(content, list);
             if (list.empty()) {
-                QPID_LOG(error, "Error parsing QMF event, expected non-empty list");
+                QPID_LOG(error, "Error parsing QMF event, empty list");
             } else {
                 try {
+                    // FIXME aconway 2011-11-18: should be iterating list?
                     qpid::types::Variant::Map& map = list.front().asMap();
                     qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
                     qpid::types::Variant::Map& values = map["_values"].asMap();
                     if (match<EventQueueDeclare>(schema)) {
-                        if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"])
{
+                        std::string name = values["qName"].asString();
+                        if (values["disp"] == "created" && isReplicated(values["args"].asMap()))
{
+                            QPID_LOG(debug, "Creating replicated queue " << name);
                             qpid::framing::FieldTable args;
                             qpid::amqp_0_10::translate(values["args"].asMap(), args);
                             if (!broker.createQueue(
-                                    values["qName"].asString(),
+                                    name,
                                     values["durable"].asBool(),
                                     values["autoDel"].asBool(),
                                     0 /*i.e. no owner regardless of exclusivity on master*/,
@@ -83,61 +107,60 @@ void NodeClone::route(Deliverable& msg, 
                                     args,
                                     values["user"].asString(),
                                     values["rhost"].asString()).second) {
-                                QPID_LOG(warning, "Propagatable queue " << values["qName"]
<< " already exists");
+                                QPID_LOG(warning, "Replicated queue " << name <<
" already exists");
                             }
                         }
                     } else if (match<EventQueueDelete>(schema)) {
                         std::string name = values["qName"].asString();
-                        QPID_LOG(debug, "Notified of deletion of queue " << name);
                         boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-                        if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO:
check value*/) {
+                        if (queue && isReplicated(queue->getSettings())) {
+                            QPID_LOG(debug, "Deleting replicated queue " << name);
                             broker.deleteQueue(
                                 name,
                                 values["user"].asString(),
                                 values["rhost"].asString());
-                        } else {
-                            if (queue) {
-                                QPID_LOG(debug, "Ignoring deletion notification for non-propagated
queue " << name);
-                            } else {
-                                QPID_LOG(debug, "No such queue " << name);
-                            }
                         }
                     } else if (match<EventExchangeDeclare>(schema)) {
-                        if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"])
{
+                        if (values["disp"] == "created" && isReplicated(values["args"].asMap()))
{
+                            std::string name = values["exName"].asString();
                             qpid::framing::FieldTable args;
                             qpid::amqp_0_10::translate(values["args"].asMap(), args);
+                            QPID_LOG(debug, "Creating replicated exchange " << name);
                             if (!broker.createExchange(
-                                    values["exName"].asString(),
+                                    name,
                                     values["exType"].asString(),
                                     values["durable"].asBool(),
                                     values["altEx"].asString(),
                                     args,
                                     values["user"].asString(),
                                     values["rhost"].asString()).second) {
-                                QPID_LOG(warning, "Propagatable queue " << values["qName"]
<< " already exists");
+                                QPID_LOG(warning, "Replicated exchange " << name <<
" already exists");
                             }
                         }
                     } else if (match<EventExchangeDelete>(schema)) {
                         std::string name = values["exName"].asString();
-                        QPID_LOG(debug, "Notified of deletion of exchange " << name);
                         try {
                             boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
-                            if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO:
check value*/) {
+                            if (exchange && isReplicated(exchange->getArgs()))
{
+                                QPID_LOG(warning, "Deleting replicated exchange " <<
name);
                                 broker.deleteExchange(
                                     name,
                                     values["user"].asString(),
                                     values["rhost"].asString());
-                            } else {
-                                if (exchange) {
-                                    QPID_LOG(debug, "Ignoring deletion notification for non-propagated
exchange " << name);
-                                } else {
-                                    QPID_LOG(debug, "No such exchange " << name);
-                                }
                             }
                         } catch (const qpid::framing::NotFoundException&) {}
                     }
+                    else if (match<EventBind>(schema)) {
+                        QPID_LOG(error, "FIXME NodeClone: Not yet implemented - replicate
bindings.");
+                    }
+                    else if (match<EventSubscribe>(schema)) {
+                        // Deliberately ignore.
+                    }
+                    else {
+                        QPID_LOG(warning, "Replicator received unexpected event, schema="
<< schema);
+                    }
                 } catch (const std::exception& e) {
-                    QPID_LOG(error, "Error propagating configuration: " << e.what());
+                    QPID_LOG(error, "Error replicating configuration: " << e.what());
                 }
             }
         } else if (headers->getAsString("qmf.opcode") == "_query_response") {
@@ -145,15 +168,14 @@ void NodeClone::route(Deliverable& msg, 
             std::string content = msg.getMessage().getFrames().getContent();
             qpid::types::Variant::List list;
             qpid::amqp_0_10::ListCodec::decode(content, list);
-            QPID_LOG(debug, "Got query response (" << list.size() << ")");
             for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end();
++i) {
                 std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
                 qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
-                QPID_LOG(debug, "class: " << type << ", values: " << values);
-                if (values["arguments"].asMap()["qpid.propagate"]) {
+                if (isReplicated(values["arguments"].asMap())) {
                     qpid::framing::FieldTable args;
                     qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
                     if (type == "queue") {
+                        QPID_LOG(debug, "Creating replicated queue " << values["name"].asString()
<< " (in catch-up)");
                         if (!broker.createQueue(
                                 values["name"].asString(),
                                 values["durable"].asBool(),
@@ -163,9 +185,10 @@ void NodeClone::route(Deliverable& msg, 
                                 args,
                                 ""/*TODO: who is the user?*/,
                                 ""/*TODO: what should we use as connection id?*/).second)
{
-                            QPID_LOG(warning, "Propagatable queue " << values["name"]
<< " already exists");
+                            QPID_LOG(warning, "Replicated queue " << values["name"]
<< " already exists (in catch-up)");
                         }
                     } else if (type == "exchange") {
+                        QPID_LOG(debug, "Creating replicated exchange " << values["name"].asString()
<< " (in catch-up)");
                         if (!broker.createExchange(
                                 values["name"].asString(),
                                 values["type"].asString(),
@@ -174,18 +197,18 @@ void NodeClone::route(Deliverable& msg, 
                                 args,
                                 ""/*TODO: who is the user?*/,
                                 ""/*TODO: what should we use as connection id?*/).second)
{
-                            QPID_LOG(warning, "Propagatable queue " << values["qName"]
<< " already exists");
+                            QPID_LOG(warning, "Replicated exchange " << values["qName"]
<< " already exists (in catch-up)");
                         }
                     } else {
-                        QPID_LOG(warning, "Ignoring unknow object class: " << type);
+                        QPID_LOG(warning, "Replicator ignoring unexpected class: " <<
type);
                     }
                 }
             }
         } else {
-            QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers);
+            QPID_LOG(warning, "Replicator ignoring QMFv2 message with headers: " <<
*headers);
         }
     } else {
-        QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response");
+        QPID_LOG(warning, "Replicator ignoring message which is not a QMFv2 event or query
response");
     }
 }
 

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h Tue Feb 14 15:59:49 2012
@@ -23,6 +23,8 @@
  */
 #include "qpid/broker/Exchange.h"
 
+// FIXME aconway 2011-11-17: relocate to ../ha
+
 namespace qpid {
 namespace broker {
 

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb 14 15:59:49 2012
@@ -209,6 +209,8 @@ class Connection :
 
     void queueDequeueSincePurgeState(const std::string&, uint32_t);
 
+    bool isAnnounced() const { return announced; }
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Feb 14 15:59:49
2012
@@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput(
 }
 
 void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&)
{
-    if (parent.isLocal() && !sentDoOutput && !closing) {
+    if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced())
{
         sentDoOutput = true;
         parent.getCluster().getMulticast().mcastControl(
             ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),

Added: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1244023&view=auto
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp (added)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp Tue Feb 14 15:59:49 2012
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "qpid/Url.h"
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace ha {
+
+Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+    // Create a link to replicate wiring
+    if (s.brokerUrl != "dummy") {
+        Url url(s.brokerUrl);
+        QPID_LOG(info, "HA backup broker connecting to: " << url);
+
+        std::string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+        broker.getLinks().declare( // Declare the link
+            url[0].host, url[0].port, protocol,
+            false,              // durable
+            s.mechanism, s.username, s.password);
+
+        broker.getLinks().declare( // Declare the bridge
+            url[0].host, url[0].port,
+            false,              // durable
+            "qpid.node-cloner", // src
+            "qpid.node-cloner", // dest
+            "x",                // key
+            false,              // isQueue
+            false,              // isLocal
+            "",                 // id/tag
+            "",                 // excludes
+            false,              // dynamic
+            0);                 // sync?
+    }
+    // FIXME aconway 2011-11-17: need to enhance the link code to
+    // handle discovery of the primary broker and fail-over correctly.
+}
+
+}} // namespace qpid::ha

Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h (from r1244022, qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h?p2=qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h&p1=qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h&r1=1244022&r2=1244023&rev=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h Tue Feb 14 15:59:49 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_SETTINGS_H
-#define QPID_HA_SETTINGS_H
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
 
 /*
  *
@@ -22,27 +22,29 @@
  *
  */
 
-#include <string>
+#include "Settings.h"
+#include "qpid/Url.h"
 
 namespace qpid {
-namespace ha {
+namespace broker {
+class Broker;
+}
 
-using std::string;
+namespace ha {
+class Settings;
 
 /**
- * Configurable settings for HA.
+ * State associated with a backup broker. Manages connections to primary.
  */
-class Settings
+class Backup
 {
   public:
-    Settings() : enabled(false) {}
-    bool enabled;
-    string status;              // primary, backup, solo
-    string clientUrl;
-    string brokerUrl;
-    string username, password, mechanism;
+    Backup(broker::Broker&, const Settings&);
+
   private:
+    broker::Broker& broker;
+    Settings settings;
 };
 }} // namespace qpid::ha
 
-#endif  /*!QPID_HA_SETTINGS_H*/
+#endif  /*!QPID_HA_BACKUP_H*/

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Feb 14 15:59:49 2012
@@ -18,10 +18,14 @@
  * under the License.
  *
  */
+#include "Backup.h"
 #include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/ha/Package.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace ha {
@@ -30,7 +34,22 @@ namespace _qmf = ::qmf::org::apache::qpi
 using namespace management;
 using namespace std;
 
-HaBroker::HaBroker(broker::Broker& b) : broker(b), mgmtObject(0) {
+namespace {
+Url url(const std::string& s, const std::string& id) {
+    try {
+        return Url(s);
+    } catch (const std::exception& e) {
+        throw Exception(Msg() << "Invalid URL for " << id << ": '" <<
s << "'");
+    }
+}
+} // namespace
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+    : broker(b),
+      clientUrl(url(s.clientUrl, "ha-client-url")),
+      brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+      mgmtObject(0)
+{
     ManagementAgent* ma = broker.getManagementAgent();
     if (ma) {
         _qmf::Package  packageInit(ma);
@@ -39,8 +58,13 @@ HaBroker::HaBroker(broker::Broker& b) : 
         mgmtObject->set_status("solo");
         ma->addObject(mgmtObject);
     }
+    QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl
+             << ", broker-url=" << brokerUrl);
+    backup.reset(new Backup(broker, s));
 }
 
+HaBroker::~HaBroker() {}
+
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&)
{
     switch (methodId) {
       case _qmf::HaBroker::METHOD_SETSTATUS: {

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h Tue Feb 14 15:59:49 2012
@@ -22,9 +22,9 @@
  *
  */
 
+#include "qpid/Url.h"
 #include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
-
 #include "qpid/management/Manageable.h"
 
 namespace qpid {
@@ -32,6 +32,8 @@ namespace broker {
 class Broker;
 }
 namespace ha {
+class Settings;
+class Backup;
 
 /**
  * HA state and actions associated with a broker.
@@ -41,7 +43,8 @@ namespace ha {
 class HaBroker : public management::Manageable
 {
   public:
-    HaBroker(broker::Broker&);
+    HaBroker(broker::Broker&, const Settings&);
+    ~HaBroker();
 
     // Implement Manageable.
     qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject;
}
@@ -50,6 +53,8 @@ class HaBroker : public management::Mana
 
   private:
     broker::Broker& broker;
+    Url clientUrl, brokerUrl;
+    std::auto_ptr<Backup> backup;
     qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
 };
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp Tue Feb 14 15:59:49 2012
@@ -57,7 +57,7 @@ struct HaPlugin : public Plugin {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (broker && settings.enabled) {
             QPID_LOG(info, "HA plugin enabled");
-            haBroker.reset(new ha::HaBroker(*broker));
+            haBroker.reset(new ha::HaBroker(*broker, settings));
         } else
             QPID_LOG(info, "HA plugin disabled");
     }

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h Tue Feb 14 15:59:49 2012
@@ -37,7 +37,6 @@ class Settings
   public:
     Settings() : enabled(false) {}
     bool enabled;
-    string status;              // primary, backup, solo
     string clientUrl;
     string brokerUrl;
     string username, password, mechanism;

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py Tue Feb 14 15:59:49 2012
@@ -444,6 +444,7 @@ class BrokerTest(TestCase):
     # Environment settings.
     qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
     cluster_lib = os.getenv("CLUSTER_LIB")
+    ha_lib = os.getenv("HA_LIB")
     xml_lib = os.getenv("XML_LIB")
     qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
     qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")

Added: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py?rev=1244023&view=auto
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py (added)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py Tue Feb 14 15:59:49 2012
@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
+from qpid.messaging import Message, NotFound
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger
+
+
+log = getLogger("qpid.ha-tests")
+
+class ShortTests(BrokerTest):
+    """Short HA functionality tests."""
+
+    def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs):
+        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+        return Broker(self, args=["--load-module", BrokerTest.ha_lib,
+                                  "--ha-enable=yes",
+                                  "--ha-client-url", client_url,
+                                  "--ha-broker-url", broker_url,
+                                  ] + args,
+                      **kwargs)
+
+    def setup_wiring(self, primary, backup):
+        cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
+        self.assertEqual(0, os.system(cmd))
+
+    def setup_replication(self, primary, backup, queue):
+        self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s qpid.replicator-%s
%s"%(backup, primary, queue, queue)))
+
+    # FIXME aconway 2011-11-15: work around async replication.
+    def wait(self, session, address):
+        def check():
+            try:
+                session.receiver(address)
+                return True
+            except NotFound: return False
+        assert retry(check), "Timed out waiting for %s"%(address)
+
+    def assert_missing(self,session, address):
+        try:
+            session.receiver(a)
+            self.fail("Should not have been replicated: %s"%(address))
+        except NotFound: pass
+
+    def test_replicate_wiring(self):
+        queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"
+        exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"
+
+        # Create some wiring before starting the backup, to test catch-up
+        primary = self.ha_broker(name="primary")
+        s = primary.connect().session()
+        s.sender(queue%("q1", "all")).send(Message("1"))
+        s.sender(queue%("q2", "wiring")).send(Message("2"))
+        s.sender(queue%("q3", "none")).send(Message("3"))
+        s.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
+
+        # Create some after starting backup, test steady-state replication
+        backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
+        s.sender(queue%("q01", "all")).send(Message("01"))
+        s.sender(queue%("q02", "wiring")).send(Message("02"))
+        s.sender(queue%("q03", "none")).send(Message("03"))
+        s.sender(exchange%("e01", "all", "e01", "q02")).send(Message("04"))
+
+        # Verify replication
+        # FIXME aconway 2011-11-18: We should kill primary here and fail over.
+        s = backup.connect().session()
+        for a in ["q01", "q02", "e01"]: self.wait(s,a)
+        # FIXME aconway 2011-11-18: replicate messages
+#         self.assert_browse(s, "q01", ["01", "04", "e01"])
+#         self.assert_browse(s, "q02", []) # wiring only
+#         self.assert_missing(s,"q03")
+        s.sender("e01").send(Message("e01")) # Verify bind
+        # FIXME aconway 2011-11-18: FIXME replicate bindings
+        # self.assert_browse(s, "q02", ["e01"])
+
+        for a in ["q1", "q2", "e1"]: self.wait(s,a)
+        # FIXME aconway 2011-11-18: replicate messages
+#         self.assert_browse(s, "q1", ["1", "4", "e1"])
+#         self.assert_browse(s, "q2", []) # wiring only
+#         self.assert_missing(s,"q3")
+        s.sender("e1").send(Message("e1")) # Verify bind
+        # FIXME aconway 2011-11-18: FIXME replicate bindings
+        # self.assert_browse(s, "q2", ["e1"])
+
+
+if __name__ == "__main__":
+    shutil.rmtree("brokertest.tmp", True)
+    os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])

Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in?rev=1244023&r1=1244022&r2=1244023&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in Tue Feb 14 15:59:49 2012
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes
 exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
 exportmodule ACL_LIB acl.so
 exportmodule CLUSTER_LIB cluster.so
+exportmodule HA_LIB ha.so
 exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
 exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
 exportmodule SSLCONNECTOR_LIB sslconnector.so



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message