qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1206346 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid: broker/Bridge.cpp broker/Bridge.h broker/LinkRegistry.cpp broker/LinkRegistry.h broker/SessionHandler.h ha/Backup.cpp
Date Fri, 25 Nov 2011 21:50:58 GMT
Author: aconway
Date: Fri Nov 25 21:50:56 2011
New Revision: 1206346

URL: http://svn.apache.org/viewvc?rev=1206346&view=rev
Log:
QPID-3603: Move init code for WiringReplicator out of Bridge into ha::Backup.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1206346&r1=1206345&r2=1206346&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Nov 25 21:50:56 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -59,9 +59,11 @@ void Bridge::PushHandler::handle(framing
 }
 
 Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
-               const _qmf::ArgsLinkBridge& _args) : 
+               const _qmf::ArgsLinkBridge& _args,
+               InitializeCallback init) :
     link(_link), id(_id), args(_args), mgmtObject(0),
-    listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
+    listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0),
+    initialize(init)
 {
     std::stringstream title;
     title << id << "_" << link->getBroker()->getFederationTag();
@@ -77,9 +79,9 @@ Bridge::Bridge(Link* _link, framing::Cha
     QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
 }
 
-Bridge::~Bridge() 
+Bridge::~Bridge()
 {
-    mgmtObject->resourceDestroy(); 
+    mgmtObject->resourceDestroy();
 }
 
 void Bridge::create(Connection& c)
@@ -98,7 +100,7 @@ void Bridge::create(Connection& c)
 
         session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
         peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-        
+
         session->attach(name, false);
         session->commandPoint(0,0);
     } else {
@@ -108,7 +110,8 @@ void Bridge::create(Connection& c)
     }
 
     if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
-    if (args.i_srcIsQueue) {
+    if (initialize) initialize(*this, sessionHandler);
+    else if (args.i_srcIsQueue) {
         //TODO: something other than this which is nasty...
         bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(),
options);
 
@@ -116,52 +119,6 @@ void Bridge::create(Connection& c)
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         QPID_LOG(debug, "Activated route from queue " << args.i_src << " to "
<< args.i_dest);
-    } else if (ha::WiringReplicator::isWiringReplicatorDestination(args.i_dest)) {
-        //declare and bind an event queue
-        peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable());
-        peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#",
FieldTable());
-        //subscribe to the queue
-        peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
-        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-        peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
-        //issue a query request for queues and another for exchanges using event queue as
the reply-to address
-        for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
-            Variant::Map request;
-            request["_what"] = "OBJECT";
-            Variant::Map schema;
-            schema["_class_name"] = (i == 0 ? "queue" : "exchange");
-            schema["_package_name"] = "org.apache.qpid.broker";
-            request["_schema_id"] = schema;
-
-            AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct",
0, 0)));
-            method.setBof(true);
-            method.setEof(false);
-            method.setBos(true);
-            method.setEos(true);
-            AMQHeaderBody headerBody;
-            MessageProperties* props = headerBody.get<MessageProperties>(true);
-            props->setReplyTo(qpid::framing::ReplyTo("", queueName));
-            props->setAppId("qmf2");
-            props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
-            headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
-            AMQFrame header(headerBody);
-            header.setBof(false);
-            header.setEof(false);
-            header.setBos(true);
-            header.setEos(true);
-            AMQContentBody data;
-            qpid::amqp_0_10::MapCodec::encode(request, data.getData());
-            AMQFrame content(data);
-            content.setBof(false);
-            content.setEof(true);
-            content.setBos(true);
-            content.setEos(true);
-            sessionHandler.out->handle(method);
-            sessionHandler.out->handle(header);
-            sessionHandler.out->handle(content);
-        }
-
     } else {
         FieldTable queueSettings;
 
@@ -236,11 +193,6 @@ void Bridge::setPersistenceId(uint64_t p
     persistenceId = pId;
 }
 
-const string& Bridge::getName() const
-{
-    return name;
-}
-
 Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
 {
     string   host;
@@ -268,7 +220,7 @@ Bridge::shared_ptr Bridge::decode(LinkRe
                          is_queue, is_local, id, excludes, dynamic, sync).first;
 }
 
-void Bridge::encode(Buffer& buffer) const 
+void Bridge::encode(Buffer& buffer) const
 {
     buffer.putShortString(string("bridge"));
     buffer.putShortString(link->getHost());
@@ -285,8 +237,8 @@ void Bridge::encode(Buffer& buffer) cons
     buffer.putShort(args.i_sync);
 }
 
-uint32_t Bridge::encodedSize() const 
-{ 
+uint32_t Bridge::encodedSize() const
+{
     return link->getHost().size() + 1 // short-string (host)
         + 7                // short-string ("bridge")
         + 2                // port
@@ -311,7 +263,7 @@ management::Manageable::status_t Bridge:
                                                           management::Args& /*args*/,
                                                           string&)
 {
-    if (methodId == _qmf::Bridge::METHOD_CLOSE) {  
+    if (methodId == _qmf::Bridge::METHOD_CLOSE) {
         //notify that we are closed
         destroy();
         return management::Manageable::STATUS_OK;
@@ -358,7 +310,7 @@ void Bridge::sendReorigin()
     conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
                                           queueName, args.i_src, args.i_key, bindArgs));
 }
-bool Bridge::resetProxy() 
+bool Bridge::resetProxy()
 {
     SessionHandler& sessionHandler = conn->getChannel(id);
     if (!sessionHandler.getSession()) peer.reset();

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.h?rev=1206346&r1=1206345&r2=1206346&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.h Fri Nov 25 21:50:56 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@ class Connection;
 class ConnectionState;
 class Link;
 class LinkRegistry;
+class SessionHandler;
 
 class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
 {
 public:
     typedef boost::shared_ptr<Bridge> shared_ptr;
     typedef boost::function<void(Bridge*)> CancellationListener;
+    typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
 
     Bridge(Link* link, framing::ChannelId id, CancellationListener l,
-           const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+           const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+           InitializeCallback init
+    );
     ~Bridge();
 
     void create(Connection& c);
@@ -70,8 +74,8 @@ public:
     void     setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
     uint32_t encodedSize() const;
-    void     encode(framing::Buffer& buffer) const; 
-    const std::string& getName() const;
+    void     encode(framing::Buffer& buffer) const;
+    const std::string& getName() const { return name; }
     static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
 
     // Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@ public:
     bool containsLocalTag(const std::string& tagList) const;
     const std::string& getLocalTag() const;
 
+    // Methods needed by initialization functions
+    std::string getQueueName() const { return queueName; }
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+
 private:
     struct PushHandler : framing::FrameHandler {
         PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@ private:
     mutable uint64_t  persistenceId;
     ConnectionState* connState;
     Connection* conn;
+    InitializeCallback initialize;
 
     bool resetProxy();
 };

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1206346&r1=1206345&r2=1206346&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Nov 25 21:50:56
2011
@@ -162,7 +162,9 @@ pair<Bridge::shared_ptr, bool> LinkRegis
                                                      const std::string& tag,
                                                      const std::string& excludes,
                                                      bool         dynamic,
-                                                     uint16_t     sync)
+                                                     uint16_t     sync,
+                                                     Bridge::InitializeCallback init
+)
 {
     Mutex::ScopedLock locker(lock);
     QPID_LOG(debug, "Bridge declared " << host << ": " << port <<
" from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +198,8 @@ pair<Bridge::shared_ptr, bool> LinkRegis
         bridge = Bridge::shared_ptr
             (new Bridge (l->second.get(), l->second->nextChannel(),
                          boost::bind(&LinkRegistry::destroy, this,
-                                     host, port, src, dest, key), args));
+                                     host, port, src, dest, key),
+                         args, init));
         bridges[bridgeKey] = bridge;
         l->second->add(bridge);
         return std::pair<Bridge::shared_ptr, bool>(bridge, true);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1206346&r1=1206345&r2=1206346&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/LinkRegistry.h Fri Nov 25 21:50:56 2011
@@ -91,6 +91,7 @@ namespace broker {
                     const std::string& authMechanism,
                     const std::string& username,
                     const std::string& password);
+
         std::pair<Bridge::shared_ptr, bool>
             declare(const std::string& host,
                     uint16_t     port,
@@ -103,9 +104,12 @@ namespace broker {
                     const std::string& id,
                     const std::string& excludes,
                     bool         dynamic,
-                    uint16_t     sync);
+                    uint16_t     sync,
+                    Bridge::InitializeCallback=0
+            );
 
         void destroy(const std::string& host, const uint16_t port);
+
         void destroy(const std::string& host,
                      const uint16_t     port,
                      const std::string& src,

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1206346&r1=1206345&r2=1206346&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Nov 25 21:50:56
2011
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 
 namespace qpid {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1206346&r1=1206345&r2=1206346&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp Fri Nov 25 21:50:56 2011
@@ -21,11 +21,79 @@
 #include "Backup.h"
 #include "Settings.h"
 #include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
 
 namespace qpid {
 namespace ha {
 
+using namespace framing;
+using namespace broker;
+using types::Variant;
+
+namespace {
+const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+}
+
+// Initialize a bridge as a wiring replicator.
+void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) {
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    std::string queueName = bridge.getQueueName();
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+    //declare and bind an event queue
+    peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+    peer.getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#",
FieldTable());
+    //subscribe to the queue
+    peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+    peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+    peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+    //issue a query request for queues and another for exchanges using event queue as the
reply-to address
+    for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
+        Variant::Map request;
+        request["_what"] = "OBJECT";
+        Variant::Map schema;
+        schema["_class_name"] = (i == 0 ? "queue" : "exchange");
+        schema["_package_name"] = "org.apache.qpid.broker";
+        request["_schema_id"] = schema;
+
+        AMQFrame method((MessageTransferBody(ProtocolVersion(), "qmf.default.direct", 0,
0)));
+        method.setBof(true);
+        method.setEof(false);
+        method.setBos(true);
+        method.setEos(true);
+        AMQHeaderBody headerBody;
+        MessageProperties* props = headerBody.get<MessageProperties>(true);
+        props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+        props->setAppId("qmf2");
+        props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
+        headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
+        AMQFrame header(headerBody);
+        header.setBof(false);
+        header.setEof(false);
+        header.setBos(true);
+        header.setEos(true);
+        AMQContentBody data;
+        qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+        AMQFrame content(data);
+        content.setBof(false);
+        content.setEof(true);
+        content.setBos(true);
+        content.setEos(true);
+        sessionHandler.out->handle(method);
+        sessionHandler.out->handle(header);
+        sessionHandler.out->handle(content);
+    }
+}
+
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
     // Create a link to replicate wiring
     if (s.brokerUrl != "dummy") {
@@ -41,15 +109,17 @@ Backup::Backup(broker::Broker& b, const 
         broker.getLinks().declare( // Declare the bridge
             url[0].host, url[0].port,
             false,              // durable
-            "qpid.wiring-replicator", // src
-            "qpid.wiring-replicator", // dest
+            QPID_WIRING_REPLICATOR, // src
+            QPID_WIRING_REPLICATOR, // dest
             "x",                // key
             false,              // isQueue
             false,              // isLocal
             "",                 // id/tag
             "",                 // excludes
             false,              // dynamic
-            0);                 // sync?
+            0,                  // sync?
+            bridgeInitWiringReplicator
+        );
     }
     // FIXME aconway 2011-11-17: need to enhance the link code to
     // handle discovery of the primary broker and fail-over correctly.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message