qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1206353 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/ha/Backup.cpp qpid/ha/HaBroker.cpp qpid/ha/HaPlugin.cpp qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h qpid/ha/WiringReplicator.cpp tests/ha_tests.py
Date Fri, 25 Nov 2011 21:53:08 GMT
Author: aconway
Date: Fri Nov 25 21:53:05 2011
New Revision: 1206353

URL: http://svn.apache.org/viewvc?rev=1206353&view=rev
Log:
QPID-3603: In progress - integrate ReplicatingSubscription.

The code to use ReplicatingSubscription is there but it is disabled by
commenting out getConsumerFactories().add in Backup.cpp because it
hangs the test.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp Fri Nov 25 21:53:05 2011
@@ -21,6 +21,7 @@
 #include "Backup.h"
 #include "Settings.h"
 #include "WiringReplicator.h"
+#include "ReplicatingSubscription.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
@@ -58,6 +59,12 @@ Backup::Backup(broker::Broker& b, const 
         link = result.first;
         boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
         broker.getExchanges().registerExchange(wr);
+
+        // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the tests
+        // The tests pass with a plain subscription if we dont add the factory.
+//         broker.getConsumerFactories().add(
+//             boost::shared_ptr<ReplicatingSubscription::Factory>(
+//                 new ReplicatingSubscription::Factory()));
     }
 }
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Nov 25 21:53:05 2011
@@ -58,7 +58,7 @@ HaBroker::HaBroker(broker::Broker& b, co
         mgmtObject->set_status("solo");
         ma->addObject(mgmtObject);
     }
-    QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl
+    QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
              << ", broker-url=" << brokerUrl);
     backup.reset(new Backup(broker, s));
 }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp Fri Nov 25 21:53:05 2011
@@ -56,10 +56,10 @@ struct HaPlugin : public Plugin {
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (broker && settings.enabled) {
-            QPID_LOG(info, "HA plugin enabled");
+            QPID_LOG(info, "HA: Enabled");
             haBroker.reset(new ha::HaBroker(*broker, settings));
         } else
-            QPID_LOG(info, "HA plugin disabled");
+            QPID_LOG(info, "HA: Disabled");
     }
 };
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Nov 25 21:53:05
2011
@@ -66,6 +66,23 @@ std::string mask(const std::string& in)
     return DOLLAR + in + INTERNAL;
 }
 
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+    SemanticState* _parent,
+    const std::string& _name,
+    Queue::shared_ptr _queue,
+    bool ack,
+    bool _acquire,
+    bool _exclusive,
+    const std::string& _tag,
+    const std::string& _resumeId,
+    uint64_t _resumeTtl,
+    const framing::FieldTable& _arguments
+) {
+    return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
+        new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag,
_resumeId, _resumeTtl, _arguments));
+}
+
 ReplicatingSubscription::ReplicatingSubscription(
     SemanticState* _parent,
     const std::string& _name,
@@ -81,7 +98,8 @@ ReplicatingSubscription::ReplicatingSubs
     events(new Queue(mask(_name))),
     consumer(new DelegatingConsumer(*this))
 {
-
+    // FIXME aconway 2011-11-25: string constants.
+    QPID_LOG(debug, "HA: replicating subscription " << _name << " to " <<
_queue->getName());
     if (_arguments.isSet("qpid.high_sequence_number")) {
         qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
         qpid::framing::SequenceNumber lwm;

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Nov 25 21:53:05
2011
@@ -24,6 +24,7 @@
 
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
 
 namespace qpid {
 
@@ -43,11 +44,21 @@ class ReplicatingSubscription : public b
                                 public broker::QueueObserver
 {
   public:
+    struct Factory : public broker::ConsumerFactory {
+        boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+            broker::SemanticState* parent,
+            const std::string& name, boost::shared_ptr<broker::Queue> ,
+            bool ack, bool acquire, bool exclusive, const std::string& tag,
+            const std::string& resumeId, uint64_t resumeTtl,
+            const framing::FieldTable& arguments);
+    };
+
     ReplicatingSubscription(broker::SemanticState* parent,
                             const std::string& name, boost::shared_ptr<broker::Queue>
,
                             bool ack, bool acquire, bool exclusive, const std::string&
tag,
                             const std::string& resumeId, uint64_t resumeTtl,
                             const framing::FieldTable& arguments);
+
     ~ReplicatingSubscription();
 
     void init();

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Nov 25 21:53:05
2011
@@ -117,22 +117,18 @@ const string S_ALL="all";
 
 ReplicateLevel replicateLevel(const string& str) {
     // FIXME aconway 2011-11-24: case insenstive comparison.
-    QPID_LOG(critical, "FIXME replicateLevel " << str);
     ReplicateLevel rl = RL_NONE;
     if (str == S_WIRING) rl = RL_WIRING;
     else if (str == S_ALL) rl = RL_ALL;
-    QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl);
     return rl;
 }
 
 ReplicateLevel replicateLevel(const framing::FieldTable& f) {
-    QPID_LOG(critical, "FIXME replicateLevel " << f);
     if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
     else return RL_NONE;
 }
 
 ReplicateLevel replicateLevel(const Variant::Map& m) {
-    QPID_LOG(critical, "FIXME replicateLevel " << m);
     Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
     if (i != m.end()) return replicateLevel(i->second.asString());
     else return RL_NONE;
@@ -234,6 +230,7 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& map = list.front().asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
+                QPID_LOG(trace, "HA: Configuration event from primary: " << values);
                 if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
                 else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
                 else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -249,29 +246,30 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+                QPID_LOG(trace, "HA: Configuration response from primary: " << values);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
-                else throw Exception(QPID_MSG("Ignoring unexpected class: " << type));
+                else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
             }
         } else {
-            QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message with headers:
" << *headers));
+            QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got:
" << *headers));
         }
     } catch (const std::exception& e) {
-        QPID_LOG(warning, "Replicator: Error replicating configuration: " << e.what());
-        QPID_LOG(debug, "Replicator: Error processing: " << list);
+        QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
+        QPID_LOG(debug, "HA: Error processing configuration message: " << list);
     }
 }
 
 void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
-    QPID_LOG(critical, "FIXME doEventQueueDeclare " << values);
     string name = values[QNAME].asString();
     Variant::Map argsMap = values[ARGS].asMap();
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
-        QPID_LOG(debug, "HA: Creating replicated queue " << name);
-        framing::FieldTable args;
+         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-        std::pair<boost::shared_ptr<Queue>, bool> result =
+
+        QPID_LOG(debug, "HA: Creating queue from event " << name);
+       std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
@@ -288,7 +286,7 @@ void WiringReplicator::doEventQueueDecla
             // out of date.
             startQueueReplicator(result.first);
         } else {
-            QPID_LOG(warning, "Replicated queue " << name << " already exists");
+            QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
         }
     }
 }
@@ -297,7 +295,7 @@ void WiringReplicator::doEventQueueDelet
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && replicateLevel(queue->getSettings())) {
-        QPID_LOG(debug, "Deleting replicated queue " << name);
+        QPID_LOG(debug, "HA: Deleting queue from event: " << name);
         broker.deleteQueue(
             name,
             values[USER].asString(),
@@ -311,6 +309,7 @@ void WiringReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
+        QPID_LOG(debug, "HA: Creating exchange from event " << name);
         if (!broker.createExchange(
                 name,
                 values[EXTYPE].asString(),
@@ -331,7 +330,7 @@ void WiringReplicator::doEventExchangeDe
     try {
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
         if (exchange && replicateLevel(exchange->getArgs())) {
-            QPID_LOG(debug, "Deleting replicated exchange " << name);
+            QPID_LOG(debug, "HA: Deleting exchange:" << name);
             broker.deleteExchange(
                 name,
                 values[USER].asString(),
@@ -358,10 +357,8 @@ void WiringReplicator::doEventBind(Varia
 }
 
 void WiringReplicator::doResponseQueue(Variant::Map& values) {
-    QPID_LOG(critical, "FIXME doResponseQueue " << values);
     // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
     Variant::Map argsMap(values[ARGUMENTS].asMap());
-    QPID_LOG(critical, "FIXME doResponseQueue replevel " << replicateLevel(argsMap));
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
@@ -429,10 +426,8 @@ void WiringReplicator::doResponseBind(Va
     try {
         std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
         std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
-        QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " <<
exName);
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
         boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
-        QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to
"  << exchange.get());
         // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
 
         // Automatically replicate exchange if queue and exchange are replicated
@@ -451,9 +446,7 @@ void WiringReplicator::doResponseBind(Va
 }
 
 void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
-    QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() <<
" " << queue->getSettings());
     if (replicateLevel(queue->getSettings()) == RL_ALL) {
-        QPID_LOG(critical, "FIXME startQueueReplicator starting");
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
         broker.getExchanges().registerExchange(qr);
     }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1206353&r1=1206352&r2=1206353&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Fri Nov 25 21:53:05 2011
@@ -43,9 +43,6 @@ class ShortTests(BrokerTest):
         cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
         self.assertEqual(0, os.system(cmd))
 
-    def setup_replication(self, primary, backup, queue):
-        self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s qpid.replicator-%s
%s"%(backup, primary, queue, queue)))
-
     # FIXME aconway 2011-11-15: work around async replication.
     def wait(self, session, address):
         def check():



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message