qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1206342 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/broker/NodeClone.cpp qpid/broker/NodeClone.h tests/ha_tests.py
Date Fri, 25 Nov 2011 21:49:51 GMT
Author: aconway
Date: Fri Nov 25 21:49:50 2011
New Revision: 1206342

URL: http://svn.apache.org/viewvc?rev=1206342&view=rev
Log:
QPID-3603: Refactored NodeClone, break out create functions.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.h
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1206342&r1=1206341&r2=1206342&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.cpp Fri Nov 25 21:49:50 2011
@@ -41,25 +41,20 @@ using qmf::org::apache::qpid::broker::Ev
 namespace qpid {
 namespace broker {
 
+using types::Variant;
+
 namespace{
 bool isQMFv2(const Message& message)
 {
-    const qpid::framing::MessageProperties* props = message.getProperties<qpid::framing::MessageProperties>();
+    const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
     return props && props->getAppId() == "qmf2";
 }
 
-template <class T> bool match(qpid::types::Variant::Map& schema)
+template <class T> bool match(Variant::Map& schema)
 {
     return T::match(schema["_class_name"], schema["_package_name"]);
 }
 
-}
-
-NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b)
{}
-
-NodeClone::~NodeClone() {}
-
-namespace {
 const std::string QPID_REPLICATE("qpid.replicate");
 const std::string ALL("all");
 const std::string WIRING("wiring");
@@ -70,95 +65,42 @@ bool isReplicated(const std::string& val
 bool isReplicated(const framing::FieldTable& f) {
     return f.isSet(QPID_REPLICATE) && isReplicated(f.getAsString(QPID_REPLICATE));
 }
-bool isReplicated(const types::Variant::Map& m) {
-    types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+bool isReplicated(const Variant::Map& m) {
+    Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
     return i != m.end() && isReplicated(i->second.asString());
 }
 
-}
+} // namespace
 
-void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable*
headers)
-{
+
+NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b)
{}
+
+NodeClone::~NodeClone() {}
+
+void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const framing::FieldTable*
headers) {
+    // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding error.
     if (isQMFv2(msg.getMessage()) && headers) {
+        // FIXME aconway 2011-11-21: string constants
         if (headers->getAsString("qmf.content") == "_event") { //decode as list
             std::string content = msg.getMessage().getFrames().getContent();
-            qpid::types::Variant::List list;
-            qpid::amqp_0_10::ListCodec::decode(content, list);
-            if (list.empty()) {
+            Variant::List list;
+            amqp_0_10::ListCodec::decode(content, list);
+            if (list.empty()) { // FIXME aconway 2011-11-21: remove
                 QPID_LOG(error, "Error parsing QMF event, empty list");
             } else {
                 try {
                     // FIXME aconway 2011-11-18: should be iterating list?
-                    qpid::types::Variant::Map& map = list.front().asMap();
-                    qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
-                    qpid::types::Variant::Map& values = map["_values"].asMap();
-                    if (match<EventQueueDeclare>(schema)) {
-                        std::string name = values["qName"].asString();
-                        if (values["disp"] == "created" && isReplicated(values["args"].asMap()))
{
-                            QPID_LOG(debug, "Creating replicated queue " << name);
-                            qpid::framing::FieldTable args;
-                            qpid::amqp_0_10::translate(values["args"].asMap(), args);
-                            if (!broker.createQueue(
-                                    name,
-                                    values["durable"].asBool(),
-                                    values["autoDel"].asBool(),
-                                    0 /*i.e. no owner regardless of exclusivity on master*/,
-                                    values["altEx"].asString(),
-                                    args,
-                                    values["user"].asString(),
-                                    values["rhost"].asString()).second) {
-                                QPID_LOG(warning, "Replicated queue " << name <<
" already exists");
-                            }
-                        }
-                    } else if (match<EventQueueDelete>(schema)) {
-                        std::string name = values["qName"].asString();
-                        boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-                        if (queue && isReplicated(queue->getSettings())) {
-                            QPID_LOG(debug, "Deleting replicated queue " << name);
-                            broker.deleteQueue(
-                                name,
-                                values["user"].asString(),
-                                values["rhost"].asString());
-                        }
-                    } else if (match<EventExchangeDeclare>(schema)) {
-                        if (values["disp"] == "created" && isReplicated(values["args"].asMap()))
{
-                            std::string name = values["exName"].asString();
-                            qpid::framing::FieldTable args;
-                            qpid::amqp_0_10::translate(values["args"].asMap(), args);
-                            QPID_LOG(debug, "Creating replicated exchange " << name);
-                            if (!broker.createExchange(
-                                    name,
-                                    values["exType"].asString(),
-                                    values["durable"].asBool(),
-                                    values["altEx"].asString(),
-                                    args,
-                                    values["user"].asString(),
-                                    values["rhost"].asString()).second) {
-                                QPID_LOG(warning, "Replicated exchange " << name <<
" already exists");
-                            }
-                        }
-                    } else if (match<EventExchangeDelete>(schema)) {
-                        std::string name = values["exName"].asString();
-                        try {
-                            boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
-                            if (exchange && isReplicated(exchange->getArgs()))
{
-                                QPID_LOG(warning, "Deleting replicated exchange " <<
name);
-                                broker.deleteExchange(
-                                    name,
-                                    values["user"].asString(),
-                                    values["rhost"].asString());
-                            } 
-                        } catch (const qpid::framing::NotFoundException&) {}
-                    }
-                    else if (match<EventBind>(schema)) {
-                        QPID_LOG(error, "FIXME NodeClone: Not yet implemented - replicate
bindings.");
-                    }
-                    else if (match<EventSubscribe>(schema)) {
-                        // Deliberately ignore.
-                    }
-                    else {
-                        QPID_LOG(warning, "Replicator received unexpected event, schema="
<< schema);
-                    }
+                    Variant::Map& map = list.front().asMap();
+                    Variant::Map& schema = map[
+                        "_schema_id"].asMap();
+                    Variant::Map& values = map["_values"].asMap();
+                    if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+                    else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+                    else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+                    else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+                    else if (match<EventBind>(schema)) doEventBind(values);
+                    else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
+                    else QPID_LOG(warning, "Replicator received unexpected event, schema="
<< schema);
                 } catch (const std::exception& e) {
                     QPID_LOG(error, "Error replicating configuration: " << e.what());
                 }
@@ -166,40 +108,18 @@ void NodeClone::route(Deliverable& msg, 
         } else if (headers->getAsString("qmf.opcode") == "_query_response") {
             //decode as list
             std::string content = msg.getMessage().getFrames().getContent();
-            qpid::types::Variant::List list;
-            qpid::amqp_0_10::ListCodec::decode(content, list);
-            for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end();
++i) {
+            Variant::List list;
+            amqp_0_10::ListCodec::decode(content, list);
+            for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
-                qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
+                Variant::Map& values = i->asMap()["_values"].asMap();
                 if (isReplicated(values["arguments"].asMap())) {
-                    qpid::framing::FieldTable args;
-                    qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
-                    if (type == "queue") {
-                        QPID_LOG(debug, "Creating replicated queue " << values["name"].asString()
<< " (in catch-up)");
-                        if (!broker.createQueue(
-                                values["name"].asString(),
-                                values["durable"].asBool(),
-                                values["autoDelete"].asBool(),
-                                0 /*i.e. no owner regardless of exclusivity on master*/,
-                                ""/*TODO: need to include alternate-exchange*/,
-                                args,
-                                ""/*TODO: who is the user?*/,
-                                ""/*TODO: what should we use as connection id?*/).second)
{
-                            QPID_LOG(warning, "Replicated queue " << values["name"]
<< " already exists (in catch-up)");
-                        }
-                    } else if (type == "exchange") {
-                        QPID_LOG(debug, "Creating replicated exchange " << values["name"].asString()
<< " (in catch-up)");
-                        if (!broker.createExchange(
-                                values["name"].asString(),
-                                values["type"].asString(),
-                                values["durable"].asBool(),
-                                ""/*TODO: need to include alternate-exchange*/,
-                                args,
-                                ""/*TODO: who is the user?*/,
-                                ""/*TODO: what should we use as connection id?*/).second)
{
-                            QPID_LOG(warning, "Replicated exchange " << values["qName"]
<< " already exists (in catch-up)");
-                        }
-                    } else {
+                    framing::FieldTable args;
+                    amqp_0_10::translate(values["arguments"].asMap(), args);
+                    if      (type == "queue") doResponseQueue(values);
+                    else if (type == "exchange") doResponseExchange(values);
+                    else if (type == "bind") doResponseBind(values);
+                    else {
                         QPID_LOG(warning, "Replicator ignoring unexpected class: " <<
type);
                     }
                 }
@@ -212,9 +132,107 @@ void NodeClone::route(Deliverable& msg, 
     }
 }
 
-bool NodeClone::isNodeCloneDestination(const std::string& target)
-{
-    return target == "qpid.node-cloner";
+void NodeClone::doEventQueueDeclare(Variant::Map& values) {
+    std::string name = values["qName"].asString();
+    if (values["disp"] == "created" && isReplicated(values["args"].asMap())) {
+        QPID_LOG(debug, "Creating replicated queue " << name);
+        framing::FieldTable args;
+        amqp_0_10::translate(values["args"].asMap(), args);
+        if (!broker.createQueue(
+                name,
+                values["durable"].asBool(),
+                values["autoDel"].asBool(),
+                0 /*i.e. no owner regardless of exclusivity on master*/,
+                values["altEx"].asString(),
+                args,
+                values["user"].asString(),
+                values["rhost"].asString()).second) {
+            QPID_LOG(warning, "Replicated queue " << name << " already exists");
+        }
+    }
+}
+
+void NodeClone::doEventQueueDelete(Variant::Map& values) {
+    std::string name = values["qName"].asString();
+    boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+    if (queue && isReplicated(queue->getSettings())) {
+        QPID_LOG(debug, "Deleting replicated queue " << name);
+        broker.deleteQueue(
+            name,
+            values["user"].asString(),
+            values["rhost"].asString());
+    }
+}
+
+void NodeClone::doEventExchangeDeclare(Variant::Map& values) {
+    if (values["disp"] == "created" && isReplicated(values["args"].asMap())) {
+        std::string name = values["exName"].asString();
+        framing::FieldTable args;
+        amqp_0_10::translate(values["args"].asMap(), args);
+        QPID_LOG(debug, "Creating replicated exchange " << name);
+        if (!broker.createExchange(
+                name,
+                values["exType"].asString(),
+                values["durable"].asBool(),
+                values["altEx"].asString(),
+                args,
+                values["user"].asString(),
+                values["rhost"].asString()).second) {
+            QPID_LOG(warning, "Replicated exchange " << name << " already exists");
+        }
+    }
+}
+
+void NodeClone::doEventExchangeDelete(Variant::Map& values) {
+    std::string name = values["exName"].asString();
+    try {
+        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+        if (exchange && isReplicated(exchange->getArgs())) {
+            QPID_LOG(warning, "Deleting replicated exchange " << name);
+            broker.deleteExchange(
+                name,
+                values["user"].asString(),
+                values["rhost"].asString());
+        } 
+    } catch (const framing::NotFoundException&) {}
+}
+
+void NodeClone::doEventBind(Variant::Map&) {
+    QPID_LOG(error, "FIXME NodeClone: Not yet implemented - replicate bindings.");
+    // FIXME aconway 2011-11-18: only replicated binds of replicated q to replicated ex.
+}
+
+void NodeClone::doResponseQueue(Variant::Map& values) {
+    QPID_LOG(debug, "Creating replicated queue " << values["name"].asString() <<
" (in catch-up)");
+    if (!broker.createQueue(
+            values["name"].asString(),
+            values["durable"].asBool(),
+            values["autoDelete"].asBool(),
+            0 /*i.e. no owner regardless of exclusivity on master*/,
+            ""/*TODO: need to include alternate-exchange*/,
+            args,
+            ""/*TODO: who is the user?*/,
+            ""/*TODO: what should we use as connection id?*/).second) {
+        QPID_LOG(warning, "Replicated queue " << values["name"] << " already
exists (in catch-up)");
+    }
+}
+
+void NodeClone::doResponseExchange(Variant::Map& values) {
+    QPID_LOG(debug, "Creating replicated exchange " << values["name"].asString() <<
" (in catch-up)");
+    if (!broker.createExchange(
+            values["name"].asString(),
+            values["type"].asString(),
+            values["durable"].asBool(),
+            ""/*TODO: need to include alternate-exchange*/,
+            args,
+            ""/*TODO: who is the user?*/,
+            ""/*TODO: what should we use as connection id?*/).second) {
+        QPID_LOG(warning, "Replicated exchange " << values["qName"] << " already
exists (in catch-up)");
+    }
+}
+
+void NodeClone::doResponseBind(Variant::Map& ) {
+    QPID_LOG(error, "FIXME NodeClone: Not yet implemented - catch-up replicate bindings.");
 }
 
 boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, Broker&
broker)
@@ -228,15 +246,20 @@ boost::shared_ptr<Exchange> NodeClone::c
     return exchange;
 }
 
-bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*)
{ return false; }
-bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*)
{ return false; }
-bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable*
const) { return false; }
+bool NodeClone::isNodeCloneDestination(const std::string& target)
+{
+    return target == "qpid.node-cloner";
+}
+
+bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const framing::FieldTable*)
{ return false; }
+bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const framing::FieldTable*)
{ return false; }
+bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const framing::FieldTable*
const) { return false; }
 
-const std::string NodeClone::typeName("node-cloner");
+const std::string NodeClone::typeName("node-cloner"); // FIXME aconway 2011-11-21: qpid.replicator
 
 std::string NodeClone::getType() const
 {
     return typeName;
 }
 
-}} // namespace qpid::broker
+}} // namespace broker

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1206342&r1=1206341&r2=1206342&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/NodeClone.h Fri Nov 25 21:49:50 2011
@@ -21,11 +21,16 @@
  * under the License.
  *
  */
+
 #include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
 
 // FIXME aconway 2011-11-17: relocate to ../ha
 
 namespace qpid {
+namespace types {
+class Variant;
+}
 namespace broker {
 
 class Broker;
@@ -49,6 +54,16 @@ class NodeClone : public Exchange
     static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
     static const std::string typeName;
   private:
+
+    void doEventQueueDeclare(types::Variant::Map& values);
+    void doEventQueueDelete(types::Variant::Map& values);
+    void doEventExchangeDeclare(types::Variant::Map& values);
+    void doEventExchangeDelete(types::Variant::Map& values);
+    void doEventBind(types::Variant::Map&);
+    void doResponseQueue(types::Variant::Map& values);
+    void doResponseExchange(types::Variant::Map& values);
+    void doResponseBind(types::Variant::Map& values);
+
     Broker& broker;
 };
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1206342&r1=1206341&r2=1206342&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Fri Nov 25 21:49:50 2011
@@ -89,8 +89,7 @@ class ShortTests(BrokerTest):
 #         self.assert_browse(s, "q02", []) # wiring only
 #         self.assert_missing(s,"q03")
         s.sender("e01").send(Message("e01")) # Verify bind
-        # FIXME aconway 2011-11-18: FIXME replicate bindings
-        # self.assert_browse(s, "q02", ["e01"]) 
+        self.assert_browse(s, "q02", ["e01"]) 
 
         for a in ["q1", "q2", "e1"]: self.wait(s,a)
         # FIXME aconway 2011-11-18: replicate messages
@@ -98,8 +97,7 @@ class ShortTests(BrokerTest):
 #         self.assert_browse(s, "q2", []) # wiring only
 #         self.assert_missing(s,"q3")
         s.sender("e1").send(Message("e1")) # Verify bind
-        # FIXME aconway 2011-11-18: FIXME replicate bindings
-        # self.assert_browse(s, "q2", ["e1"]) 
+        self.assert_browse(s, "q2", ["e1"]) 
 
 
 if __name__ == "__main__":



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message