Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CDEC79589 for ; Tue, 14 Feb 2012 16:00:20 +0000 (UTC) Received: (qmail 93432 invoked by uid 500); 14 Feb 2012 16:00:20 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 93410 invoked by uid 500); 14 Feb 2012 16:00:20 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 93403 invoked by uid 99); 14 Feb 2012 16:00:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Feb 2012 16:00:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Feb 2012 16:00:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 02DF523888CD for ; Tue, 14 Feb 2012 15:59:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1244023 - in /qpid/branches/qpid-3603-6/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ qpid/ha/ tests/ Date: Tue, 14 Feb 2012 15:59:51 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120214155952.02DF523888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Tue Feb 14 15:59:49 2012 New Revision: 1244023 URL: http://svn.apache.org/viewvc?rev=1244023&view=rev Log: QPID-3603: Automatic wiring replication for HA. Automatic replication of queues an exchanges. Bidnings TBD. Get rid of long delay establishing connections: - broker/Connection.cpp: requestIOProcessing() called before connection open saves work till connection is open. - broker/Link.cpp,LinkRegistry: Fix some const correctness errors. Added: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp (with props) qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h - copied, changed from r1244022, qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py (with props) Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/ha.mk Tue Feb 14 15:59:49 2012 @@ -26,6 +26,8 @@ ha_la_SOURCES = \ qpid/ha/HaPlugin.cpp \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ + qpid/ha/Backup.cpp \ + qpid/ha/Backup.h \ qpid/ha/Settings.h ha_la_LIBADD = libqpidbroker.la Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Connection.cpp Tue Feb 14 15:59:49 2012 @@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boo { ScopedLock l(ioCallbackLock); ioCallbacks.push(callback); - out.activateOutput(); + if (isOpen()) out.activateOutput(); } Connection::~Connection() @@ -156,11 +156,14 @@ Connection::~Connection() void Connection::received(framing::AMQFrame& frame) { // Received frame on connection so delay timeout restartTimeout(); + bool wasOpen = isOpen(); adapter.handle(frame); if (link) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); + if (!wasOpen && isOpen()) + doIoCallbacks(); // Do any callbacks registered before we opened. } void Connection::sent(const framing::AMQFrame& frame) @@ -329,17 +332,16 @@ void Connection::closed(){ // Physically } void Connection::doIoCallbacks() { - { - ScopedLock l(ioCallbackLock); - // Although IO callbacks execute in the connection thread context, they are - // not cluster safe because they are queued for execution in non-IO threads. - ClusterUnsafeScope cus; - while (!ioCallbacks.empty()) { - boost::function0 cb = ioCallbacks.front(); - ioCallbacks.pop(); - ScopedUnlock ul(ioCallbackLock); - cb(); // Lend the IO thread for management processing - } + if (!isOpen()) return; // Don't process IO callbacks until we are open. + ScopedLock l(ioCallbackLock); + // Although IO callbacks execute in the connection thread context, they are + // not cluster safe because they are queued for execution in non-IO threads. + ClusterUnsafeScope cus; + while (!ioCallbacks.empty()) { + boost::function0 cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing } } Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.cpp Tue Feb 14 15:59:49 2012 @@ -47,13 +47,13 @@ namespace _qmf = qmf::org::apache::qpid: Link::Link(LinkRegistry* _links, MessageStore* _store, - string& _host, + const string& _host, uint16_t _port, - string& _transport, + const string& _transport, bool _durable, - string& _authMechanism, - string& _username, - string& _password, + const string& _authMechanism, + const string& _username, + const string& _password, Broker* _broker, Manageable* parent) : links(_links), store(_store), host(_host), port(_port), @@ -79,6 +79,7 @@ Link::Link(LinkRegistry* _links, } } setStateLH(STATE_WAITING); + startConnectionLH(); } Link::~Link () @@ -213,28 +214,30 @@ void Link::add(Bridge::shared_ptr bridge { Mutex::ScopedLock mutex(lock); created.push_back (bridge); + if (connection) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } void Link::cancel(Bridge::shared_ptr bridge) { - { - Mutex::ScopedLock mutex(lock); + Mutex::ScopedLock mutex(lock); - for (Bridges::iterator i = created.begin(); i != created.end(); i++) { - if ((*i).get() == bridge.get()) { - created.erase(i); - break; - } + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if ((*i).get() == bridge.get()) { - cancellations.push_back(bridge); - bridge->closed(); - active.erase(i); - break; - } + } + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; } } + if (!cancellations.empty()) { connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -284,6 +287,8 @@ void Link::setConnection(Connection* c) Mutex::ScopedLock mutex(lock); connection = c; updateUrls = true; + // Process any IO tasks bridges added before setConnection. + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::maintenanceVisit () @@ -313,7 +318,7 @@ void Link::maintenanceVisit () } else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); -} + } void Link::reconnect(const qpid::Address& a) { Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Link.h Tue Feb 14 15:59:49 2012 @@ -92,13 +92,13 @@ namespace qpid { Link(LinkRegistry* links, MessageStore* store, - std::string& host, + const std::string& host, uint16_t port, - std::string& transport, + const std::string& transport, bool durable, - std::string& authMechanism, - std::string& username, - std::string& password, + const std::string& authMechanism, + const std::string& username, + const std::string& password, Broker* broker, management::Manageable* parent = 0); virtual ~Link(); Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Feb 14 15:59:49 2012 @@ -124,13 +124,13 @@ bool LinkRegistry::updateAddress(const s } } -pair LinkRegistry::declare(string& host, +pair LinkRegistry::declare(const string& host, uint16_t port, - string& transport, + const string& transport, bool durable, - string& authMechanism, - string& username, - string& password) + const string& authMechanism, + const string& username, + const string& password) { Mutex::ScopedLock locker(lock); @@ -151,16 +151,16 @@ pair LinkRegistr return std::pair(i->second, false); } -pair LinkRegistry::declare(std::string& host, +pair LinkRegistry::declare(const std::string& host, uint16_t port, bool durable, - std::string& src, - std::string& dest, - std::string& key, + const std::string& src, + const std::string& dest, + const std::string& key, bool isQueue, bool isLocal, - std::string& tag, - std::string& excludes, + const std::string& tag, + const std::string& excludes, bool dynamic, uint16_t sync) { Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Feb 14 15:59:49 2012 @@ -84,24 +84,24 @@ namespace broker { ~LinkRegistry(); std::pair, bool> - declare(std::string& host, + declare(const std::string& host, uint16_t port, - std::string& transport, + const std::string& transport, bool durable, - std::string& authMechanism, - std::string& username, - std::string& password); + const std::string& authMechanism, + const std::string& username, + const std::string& password); std::pair - declare(std::string& host, + declare(const std::string& host, uint16_t port, bool durable, - std::string& src, - std::string& dest, - std::string& key, + const std::string& src, + const std::string& dest, + const std::string& key, bool isQueue, bool isLocal, - std::string& id, - std::string& excludes, + const std::string& id, + const std::string& excludes, bool dynamic, uint16_t sync); Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.cpp Tue Feb 14 15:59:49 2012 @@ -24,15 +24,19 @@ #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/framing/reply_exceptions.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" -using qmf::org::apache::qpid::broker::EventQueueDeclare; -using qmf::org::apache::qpid::broker::EventQueueDelete; +using qmf::org::apache::qpid::broker::EventBind; using qmf::org::apache::qpid::broker::EventExchangeDeclare; using qmf::org::apache::qpid::broker::EventExchangeDelete; +using qmf::org::apache::qpid::broker::EventQueueDeclare; +using qmf::org::apache::qpid::broker::EventQueueDelete; +using qmf::org::apache::qpid::broker::EventSubscribe; namespace qpid { namespace broker { @@ -55,27 +59,47 @@ NodeClone::NodeClone(const std::string& NodeClone::~NodeClone() {} +namespace { +const std::string QPID_REPLICATE("qpid.replicate"); +const std::string ALL("all"); +const std::string WIRING("wiring"); + +bool isReplicated(const std::string& value) { + return value == ALL || value == WIRING; +} +bool isReplicated(const framing::FieldTable& f) { + return f.isSet(QPID_REPLICATE) && isReplicated(f.getAsString(QPID_REPLICATE)); +} +bool isReplicated(const types::Variant::Map& m) { + types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + return i != m.end() && isReplicated(i->second.asString()); +} + +} + void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers) { if (isQMFv2(msg.getMessage()) && headers) { - if (headers->getAsString("qmf.content") == "_event") { - //decode as list + if (headers->getAsString("qmf.content") == "_event") { //decode as list std::string content = msg.getMessage().getFrames().getContent(); qpid::types::Variant::List list; qpid::amqp_0_10::ListCodec::decode(content, list); if (list.empty()) { - QPID_LOG(error, "Error parsing QMF event, expected non-empty list"); + QPID_LOG(error, "Error parsing QMF event, empty list"); } else { try { + // FIXME aconway 2011-11-18: should be iterating list? qpid::types::Variant::Map& map = list.front().asMap(); qpid::types::Variant::Map& schema = map["_schema_id"].asMap(); qpid::types::Variant::Map& values = map["_values"].asMap(); if (match(schema)) { - if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) { + std::string name = values["qName"].asString(); + if (values["disp"] == "created" && isReplicated(values["args"].asMap())) { + QPID_LOG(debug, "Creating replicated queue " << name); qpid::framing::FieldTable args; qpid::amqp_0_10::translate(values["args"].asMap(), args); if (!broker.createQueue( - values["qName"].asString(), + name, values["durable"].asBool(), values["autoDel"].asBool(), 0 /*i.e. no owner regardless of exclusivity on master*/, @@ -83,61 +107,60 @@ void NodeClone::route(Deliverable& msg, args, values["user"].asString(), values["rhost"].asString()).second) { - QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists"); + QPID_LOG(warning, "Replicated queue " << name << " already exists"); } } } else if (match(schema)) { std::string name = values["qName"].asString(); - QPID_LOG(debug, "Notified of deletion of queue " << name); boost::shared_ptr queue = broker.getQueues().find(name); - if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) { + if (queue && isReplicated(queue->getSettings())) { + QPID_LOG(debug, "Deleting replicated queue " << name); broker.deleteQueue( name, values["user"].asString(), values["rhost"].asString()); - } else { - if (queue) { - QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name); - } else { - QPID_LOG(debug, "No such queue " << name); - } } } else if (match(schema)) { - if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) { + if (values["disp"] == "created" && isReplicated(values["args"].asMap())) { + std::string name = values["exName"].asString(); qpid::framing::FieldTable args; qpid::amqp_0_10::translate(values["args"].asMap(), args); + QPID_LOG(debug, "Creating replicated exchange " << name); if (!broker.createExchange( - values["exName"].asString(), + name, values["exType"].asString(), values["durable"].asBool(), values["altEx"].asString(), args, values["user"].asString(), values["rhost"].asString()).second) { - QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists"); + QPID_LOG(warning, "Replicated exchange " << name << " already exists"); } } } else if (match(schema)) { std::string name = values["exName"].asString(); - QPID_LOG(debug, "Notified of deletion of exchange " << name); try { boost::shared_ptr exchange = broker.getExchanges().get(name); - if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) { + if (exchange && isReplicated(exchange->getArgs())) { + QPID_LOG(warning, "Deleting replicated exchange " << name); broker.deleteExchange( name, values["user"].asString(), values["rhost"].asString()); - } else { - if (exchange) { - QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name); - } else { - QPID_LOG(debug, "No such exchange " << name); - } } } catch (const qpid::framing::NotFoundException&) {} } + else if (match(schema)) { + QPID_LOG(error, "FIXME NodeClone: Not yet implemented - replicate bindings."); + } + else if (match(schema)) { + // Deliberately ignore. + } + else { + QPID_LOG(warning, "Replicator received unexpected event, schema=" << schema); + } } catch (const std::exception& e) { - QPID_LOG(error, "Error propagating configuration: " << e.what()); + QPID_LOG(error, "Error replicating configuration: " << e.what()); } } } else if (headers->getAsString("qmf.opcode") == "_query_response") { @@ -145,15 +168,14 @@ void NodeClone::route(Deliverable& msg, std::string content = msg.getMessage().getFrames().getContent(); qpid::types::Variant::List list; qpid::amqp_0_10::ListCodec::decode(content, list); - QPID_LOG(debug, "Got query response (" << list.size() << ")"); for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) { std::string type = i->asMap()["_schema_id"].asMap()["_class_name"]; qpid::types::Variant::Map& values = i->asMap()["_values"].asMap(); - QPID_LOG(debug, "class: " << type << ", values: " << values); - if (values["arguments"].asMap()["qpid.propagate"]) { + if (isReplicated(values["arguments"].asMap())) { qpid::framing::FieldTable args; qpid::amqp_0_10::translate(values["arguments"].asMap(), args); if (type == "queue") { + QPID_LOG(debug, "Creating replicated queue " << values["name"].asString() << " (in catch-up)"); if (!broker.createQueue( values["name"].asString(), values["durable"].asBool(), @@ -163,9 +185,10 @@ void NodeClone::route(Deliverable& msg, args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists"); + QPID_LOG(warning, "Replicated queue " << values["name"] << " already exists (in catch-up)"); } } else if (type == "exchange") { + QPID_LOG(debug, "Creating replicated exchange " << values["name"].asString() << " (in catch-up)"); if (!broker.createExchange( values["name"].asString(), values["type"].asString(), @@ -174,18 +197,18 @@ void NodeClone::route(Deliverable& msg, args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists"); + QPID_LOG(warning, "Replicated exchange " << values["qName"] << " already exists (in catch-up)"); } } else { - QPID_LOG(warning, "Ignoring unknow object class: " << type); + QPID_LOG(warning, "Replicator ignoring unexpected class: " << type); } } } } else { - QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers); + QPID_LOG(warning, "Replicator ignoring QMFv2 message with headers: " << *headers); } } else { - QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response"); + QPID_LOG(warning, "Replicator ignoring message which is not a QMFv2 event or query response"); } } Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/NodeClone.h Tue Feb 14 15:59:49 2012 @@ -23,6 +23,8 @@ */ #include "qpid/broker/Exchange.h" +// FIXME aconway 2011-11-17: relocate to ../ha + namespace qpid { namespace broker { Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb 14 15:59:49 2012 @@ -209,6 +209,8 @@ class Connection : void queueDequeueSincePurgeState(const std::string&, uint32_t); + bool isAnnounced() const { return announced; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Feb 14 15:59:49 2012 @@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput( } void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) { - if (parent.isLocal() && !sentDoOutput && !closing) { + if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) { sentDoOutput = true; parent.getCluster().getMulticast().mcastControl( ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit), Added: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1244023&view=auto ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp (added) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp Tue Feb 14 15:59:49 2012 @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Backup.h" +#include "Settings.h" +#include "qpid/Url.h" +#include "qpid/broker/Broker.h" + +namespace qpid { +namespace ha { + +Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { + // Create a link to replicate wiring + if (s.brokerUrl != "dummy") { + Url url(s.brokerUrl); + QPID_LOG(info, "HA backup broker connecting to: " << url); + + std::string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + broker.getLinks().declare( // Declare the link + url[0].host, url[0].port, protocol, + false, // durable + s.mechanism, s.username, s.password); + + broker.getLinks().declare( // Declare the bridge + url[0].host, url[0].port, + false, // durable + "qpid.node-cloner", // src + "qpid.node-cloner", // dest + "x", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0); // sync? + } + // FIXME aconway 2011-11-17: need to enhance the link code to + // handle discovery of the primary broker and fail-over correctly. +} + +}} // namespace qpid::ha Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h (from r1244022, qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h) URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h?p2=qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h&p1=qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h&r1=1244022&r2=1244023&rev=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Backup.h Tue Feb 14 15:59:49 2012 @@ -1,5 +1,5 @@ -#ifndef QPID_HA_SETTINGS_H -#define QPID_HA_SETTINGS_H +#ifndef QPID_HA_BACKUP_H +#define QPID_HA_BACKUP_H /* * @@ -22,27 +22,29 @@ * */ -#include +#include "Settings.h" +#include "qpid/Url.h" namespace qpid { -namespace ha { +namespace broker { +class Broker; +} -using std::string; +namespace ha { +class Settings; /** - * Configurable settings for HA. + * State associated with a backup broker. Manages connections to primary. */ -class Settings +class Backup { public: - Settings() : enabled(false) {} - bool enabled; - string status; // primary, backup, solo - string clientUrl; - string brokerUrl; - string username, password, mechanism; + Backup(broker::Broker&, const Settings&); + private: + broker::Broker& broker; + Settings settings; }; }} // namespace qpid::ha -#endif /*!QPID_HA_SETTINGS_H*/ +#endif /*!QPID_HA_BACKUP_H*/ Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Feb 14 15:59:49 2012 @@ -18,10 +18,14 @@ * under the License. * */ +#include "Backup.h" #include "HaBroker.h" +#include "Settings.h" +#include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" +#include "qpid/log/Statement.h" namespace qpid { namespace ha { @@ -30,7 +34,22 @@ namespace _qmf = ::qmf::org::apache::qpi using namespace management; using namespace std; -HaBroker::HaBroker(broker::Broker& b) : broker(b), mgmtObject(0) { +namespace { +Url url(const std::string& s, const std::string& id) { + try { + return Url(s); + } catch (const std::exception& e) { + throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'"); + } +} +} // namespace + +HaBroker::HaBroker(broker::Broker& b, const Settings& s) + : broker(b), + clientUrl(url(s.clientUrl, "ha-client-url")), + brokerUrl(url(s.brokerUrl, "ha-broker-url")), + mgmtObject(0) +{ ManagementAgent* ma = broker.getManagementAgent(); if (ma) { _qmf::Package packageInit(ma); @@ -39,8 +58,13 @@ HaBroker::HaBroker(broker::Broker& b) : mgmtObject->set_status("solo"); ma->addObject(mgmtObject); } + QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl + << ", broker-url=" << brokerUrl); + backup.reset(new Backup(broker, s)); } +HaBroker::~HaBroker() {} + Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { switch (methodId) { case _qmf::HaBroker::METHOD_SETSTATUS: { Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.h Tue Feb 14 15:59:49 2012 @@ -22,9 +22,9 @@ * */ +#include "qpid/Url.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h" - #include "qpid/management/Manageable.h" namespace qpid { @@ -32,6 +32,8 @@ namespace broker { class Broker; } namespace ha { +class Settings; +class Backup; /** * HA state and actions associated with a broker. @@ -41,7 +43,8 @@ namespace ha { class HaBroker : public management::Manageable { public: - HaBroker(broker::Broker&); + HaBroker(broker::Broker&, const Settings&); + ~HaBroker(); // Implement Manageable. qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } @@ -50,6 +53,8 @@ class HaBroker : public management::Mana private: broker::Broker& broker; + Url clientUrl, brokerUrl; + std::auto_ptr backup; qmf::org::apache::qpid::ha::HaBroker* mgmtObject; }; }} // namespace qpid::ha Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaPlugin.cpp Tue Feb 14 15:59:49 2012 @@ -57,7 +57,7 @@ struct HaPlugin : public Plugin { broker::Broker* broker = dynamic_cast(&target); if (broker && settings.enabled) { QPID_LOG(info, "HA plugin enabled"); - haBroker.reset(new ha::HaBroker(*broker)); + haBroker.reset(new ha::HaBroker(*broker, settings)); } else QPID_LOG(info, "HA plugin disabled"); } Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/Settings.h Tue Feb 14 15:59:49 2012 @@ -37,7 +37,6 @@ class Settings public: Settings() : enabled(false) {} bool enabled; - string status; // primary, backup, solo string clientUrl; string brokerUrl; string username, password, mechanism; Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/brokertest.py Tue Feb 14 15:59:49 2012 @@ -444,6 +444,7 @@ class BrokerTest(TestCase): # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") + ha_lib = os.getenv("HA_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") Added: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py?rev=1244023&view=auto ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py (added) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py Tue Feb 14 15:59:49 2012 @@ -0,0 +1,107 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil +from qpid.messaging import Message, NotFound +from brokertest import * +from threading import Thread, Lock, Condition +from logging import getLogger + + +log = getLogger("qpid.ha-tests") + +class ShortTests(BrokerTest): + """Short HA functionality tests.""" + + def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + return Broker(self, args=["--load-module", BrokerTest.ha_lib, + "--ha-enable=yes", + "--ha-client-url", client_url, + "--ha-broker-url", broker_url, + ] + args, + **kwargs) + + def setup_wiring(self, primary, backup): + cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary) + self.assertEqual(0, os.system(cmd)) + + def setup_replication(self, primary, backup, queue): + self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s qpid.replicator-%s %s"%(backup, primary, queue, queue))) + + # FIXME aconway 2011-11-15: work around async replication. + def wait(self, session, address): + def check(): + try: + session.receiver(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for %s"%(address) + + def assert_missing(self,session, address): + try: + session.receiver(a) + self.fail("Should not have been replicated: %s"%(address)) + except NotFound: pass + + def test_replicate_wiring(self): + queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}" + exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}" + + # Create some wiring before starting the backup, to test catch-up + primary = self.ha_broker(name="primary") + s = primary.connect().session() + s.sender(queue%("q1", "all")).send(Message("1")) + s.sender(queue%("q2", "wiring")).send(Message("2")) + s.sender(queue%("q3", "none")).send(Message("3")) + s.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4")) + + # Create some after starting backup, test steady-state replication + backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + s.sender(queue%("q01", "all")).send(Message("01")) + s.sender(queue%("q02", "wiring")).send(Message("02")) + s.sender(queue%("q03", "none")).send(Message("03")) + s.sender(exchange%("e01", "all", "e01", "q02")).send(Message("04")) + + # Verify replication + # FIXME aconway 2011-11-18: We should kill primary here and fail over. + s = backup.connect().session() + for a in ["q01", "q02", "e01"]: self.wait(s,a) + # FIXME aconway 2011-11-18: replicate messages +# self.assert_browse(s, "q01", ["01", "04", "e01"]) +# self.assert_browse(s, "q02", []) # wiring only +# self.assert_missing(s,"q03") + s.sender("e01").send(Message("e01")) # Verify bind + # FIXME aconway 2011-11-18: FIXME replicate bindings + # self.assert_browse(s, "q02", ["e01"]) + + for a in ["q1", "q2", "e1"]: self.wait(s,a) + # FIXME aconway 2011-11-18: replicate messages +# self.assert_browse(s, "q1", ["1", "4", "e1"]) +# self.assert_browse(s, "q2", []) # wiring only +# self.assert_missing(s,"q3") + s.sender("e1").send(Message("e1")) # Verify bind + # FIXME aconway 2011-11-18: FIXME replicate bindings + # self.assert_browse(s, "q2", ["e1"]) + + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/ha_tests.py ------------------------------------------------------------------------------ svn:executable = * Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in?rev=1244023&r1=1244022&r2=1244023&view=diff ============================================================================== --- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in (original) +++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/test_env.sh.in Tue Feb 14 15:59:49 2012 @@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so +exportmodule HA_LIB ha.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org