qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1206351 - in /qpid/branches/qpid-3603/qpid/cpp/src: ./ qpid/broker/ qpid/ha/ tests/
Date Fri, 25 Nov 2011 21:52:27 GMT
Author: aconway
Date: Fri Nov 25 21:52:16 2011
New Revision: 1206351

URL: http://svn.apache.org/viewvc?rev=1206351&view=rev
Log:
QPID-3603: Automatic wiring and message replication.

Added:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp   (contents, props changed)
      - copied, changed from r1206350, qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h   (contents, props changed)
      - copied, changed from r1206350, qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.h
Removed:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.h
Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Link.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.h
    qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am Fri Nov 25 21:52:16 2011
@@ -622,8 +622,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueueFlowLimit.h \
   qpid/broker/QueueFlowLimit.cpp \
-  qpid/broker/QueueReplicator.h \
-  qpid/broker/QueueReplicator.cpp \
   qpid/broker/ReplicatingSubscription.h \
   qpid/broker/ReplicatingSubscription.cpp \
   qpid/broker/RateFlowcontrol.h \

Modified: qpid/branches/qpid-3603/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/ha.mk?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/ha.mk Fri Nov 25 21:52:16 2011
@@ -29,6 +29,8 @@ ha_la_SOURCES =					\
   qpid/ha/HaBroker.h				\
   qpid/ha/HaPlugin.cpp				\
   qpid/ha/Settings.h				\
+  qpid/ha/QueueReplicator.h			\
+  qpid/ha/QueueReplicator.cpp			\
   qpid/ha/WiringReplicator.cpp			\
   qpid/ha/WiringReplicator.h
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Nov 25 21:52:16 2011
@@ -25,7 +25,6 @@
 #include "qpid/broker/Link.h"
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/ha/WiringReplicator.h"
-#include "qpid/broker/QueueReplicator.h"
 #include "qpid/broker/SessionState.h"
 
 #include "qpid/management/ManagementAgent.h"
@@ -112,10 +111,7 @@ void Bridge::create(Connection& c)
     if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
     if (initialize) initialize(*this, sessionHandler);
     else if (args.i_srcIsQueue) {
-        //TODO: something other than this which is nasty...
-        bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(),
options);
-
-        peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink
? 1 : 0, false, "", 0, options);
+        peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0,
false, "", 0, options);
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         QPID_LOG(debug, "Activated route from queue " << args.i_src << " to "
<< args.i_dest);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Link.h?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Link.h Fri Nov 25 21:52:16 2011
@@ -105,6 +105,8 @@ namespace qpid {
 
             std::string getHost() { return host; }
             uint16_t    getPort() { return port; }
+            std::string getTransport() { return transport; }
+
             bool isDurable() { return durable; }
             void maintenanceVisit ();
             uint nextChannel();

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Fri Nov 25 21:52:16 2011
@@ -1434,7 +1434,7 @@ bool Queue::bind(boost::shared_ptr<Excha
 }
 
 
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
 {
     return broker;
 }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h Fri Nov 25 21:52:16 2011
@@ -403,7 +403,7 @@ class Queue : public boost::enable_share
 
     void flush();
 
-    const Broker* getBroker();
+    Broker* getBroker();
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Nov 25 21:52:16
2011
@@ -27,7 +27,6 @@
 #include "qpid/broker/Message.h"
 #include "qpid/ha/WiringReplicator.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueReplicator.h"
 #include "qpid/broker/ReplicatingSubscription.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
@@ -497,9 +496,10 @@ void SemanticState::route(intrusive_ptr<
     msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
 
     std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
{
-        cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
-        if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+    if (!cacheExchange || cacheExchange->getName() != exchangeName
+        || cacheExchange->isDestroyed())
+    {
+        cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     }
     cacheExchange->setProperties(msg);
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp Fri Nov 25 21:52:16 2011
@@ -42,9 +42,10 @@ using types::Variant;
 using std::string;
 
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+    // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address.
     if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify
primary.
         Url url(s.brokerUrl);
-        QPID_LOG(info, "HA backup broker connecting to: " << url);
+        QPID_LOG(info, "HA: Acting as backup to " << url);
         string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
 
         // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.

Copied: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (from r1206350, qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.cpp)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?p2=qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp&p1=qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.cpp&r1=1206350&r2=1206351&rev=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Nov 25 21:52:16 2011
@@ -18,18 +18,62 @@
  * under the License.
  *
  */
-#include "qpid/broker/QueueReplicator.h"
+
+#include "QueueReplicator.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+}
 
 namespace qpid {
-namespace broker {
+namespace ha {
+using namespace broker;
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link>
l)
+    : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden
from management?
+      queue(q), link(l), current(queue->getPosition())
+{
+    // FIXME aconway 2011-11-24: consistent logging.
+    QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " <<
q->getSettings());
+    queue->getBroker()->getLinks().declare(
+        link->getHost(), link->getPort(),
+        false,              // durable
+        queue->getName(),   // src
+        getName(),          // dest
+        "",                 // key
+        false,              // isQueue
+        false,              // isLocal
+        "",                 // id/tag
+        "",                 // excludes
+        false,              // dynamic
+        0,                  // sync?
+        boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
+    );
+}
 
-QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue>
q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {}
 QueueReplicator::~QueueReplicator() {}
 
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler)
{
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+    peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "",
0, framing::FieldTable());
+    peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+    peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+    QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " <<
args.i_dest);
+
+}
+
+
 namespace {
 const std::string DEQUEUE_EVENT("dequeue-event");
 const std::string REPLICATOR("qpid.replicator-");
@@ -78,23 +122,6 @@ bool QueueReplicator::isReplicatingLink(
     return name.find(REPLICATOR) == 0;
 }
 
-boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target,
QueueRegistry& queues)
-{
-    boost::shared_ptr<Exchange> exchange;
-    if (isReplicatingLink(target)) {
-        std::string queueName = target.substr(REPLICATOR.size());
-        boost::shared_ptr<Queue> queue = queues.find(queueName);
-        if (!queue) {
-            QPID_LOG(warning, "Unable to create replicator, can't find " << queueName);
-        } else {
-            //TODO: need to cache the replicator
-            QPID_LOG(info, "Creating replicator for " << queueName);
-            exchange.reset(new QueueReplicator(target, queue));
-        }
-    }
-    return exchange;
-}
-
 bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry&
queues, qpid::framing::FieldTable& settings)
 {
     if (isReplicatingLink(target)) {

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (from r1206350, qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?p2=qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h&p1=qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.h&r1=1206350&r2=1206351&rev=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Nov 25 21:52:16 2011
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_QUEUEREPLICATOR_H
-#define QPID_BROKER_QUEUEREPLICATOR_H
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
 
 /*
  *
@@ -25,33 +25,43 @@
 #include "qpid/framing/SequenceSet.h"
 
 namespace qpid {
-namespace broker {
 
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
 class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
 
 /**
  * Dummy exchange for processing replication messages
  */
-class QueueReplicator : public Exchange
+class QueueReplicator : public broker::Exchange
 {
   public:
-    QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
+    QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link>
l);
     ~QueueReplicator();
     std::string getType() const;
-    bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
-    bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
-    void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
-    bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable*
const);
+    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const
framing::FieldTable* const);
     static bool isReplicatingLink(const std::string&);
-    static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&);
-    static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&);
+    static bool initReplicationSettings(const std::string&, broker::QueueRegistry&,
framing::FieldTable&);
     static const std::string typeName;
   private:
-    boost::shared_ptr<Queue> queue;
-    qpid::framing::SequenceNumber current;
-    qpid::framing::SequenceSet dequeued;
+    void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+
+    boost::shared_ptr<broker::Queue> queue;
+    boost::shared_ptr<broker::Link> link;
+    framing::SequenceNumber current;
+    framing::SequenceSet dequeued;
 };
 
-}} // namespace qpid::broker
+}} // namespace qpid::ha
 
-#endif  /*!QPID_BROKER_QUEUEREPLICATOR_H*/
+#endif  /*!QPID_HA_QUEUEREPLICATOR_H*/

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Nov 25 21:52:16
2011
@@ -19,6 +19,7 @@
  *
  */
 #include "WiringReplicator.h"
+#include "QueueReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Link.h"
@@ -62,7 +63,6 @@ const string QUERY_RESPONSE("_query_resp
 const string SCHEMA_ID("_schema_id");
 const string VALUES("_values");
 
-const string ALL("all");
 const string ALTEX("altEx");
 const string ARGS("args");
 const string ARGUMENTS("arguments");
@@ -83,7 +83,6 @@ const string QUEUE("queue");
 const string RHOST("rhost");
 const string TYPE("type");
 const string USER("user");
-const string WIRING("wiring");
 
 const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string QMF2("qmf2");
@@ -110,15 +109,33 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-bool isReplicated(const string& value) {
-    return value == ALL || value == WIRING;
+// FIXME aconway 2011-11-24: this should be a class.
+enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+const string S_NONE="none";
+const string S_WIRING="wiring";
+const string S_ALL="all";
+
+ReplicateLevel replicateLevel(const string& str) {
+    // FIXME aconway 2011-11-24: case insenstive comparison.
+    QPID_LOG(critical, "FIXME replicateLevel " << str);
+    ReplicateLevel rl = RL_NONE;
+    if (str == S_WIRING) rl = RL_WIRING;
+    else if (str == S_ALL) rl = RL_ALL;
+    QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl);
+    return rl;
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+    QPID_LOG(critical, "FIXME replicateLevel " << f);
+    if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+    else return RL_NONE;
 }
-bool isReplicated(const framing::FieldTable& f) {
-    return f.isSet(QPID_REPLICATE) && isReplicated(f.getAsString(QPID_REPLICATE));
-}
-bool isReplicated(const Variant::Map& m) {
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+    QPID_LOG(critical, "FIXME replicateLevel " << m);
     Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
-    return i != m.end() && isReplicated(i->second.asString());
+    if (i != m.end()) return replicateLevel(i->second.asString());
+    else return RL_NONE;
 }
 
 void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler)
{
@@ -164,6 +181,8 @@ WiringReplicator::~WiringReplicator() {}
 WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
     : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
 {
+    QPID_LOG(debug, "HA: Starting replication from " <<
+             link->getTransport() << ":" << link->getHost() << ":"
<< link->getPort());
     broker.getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
@@ -198,6 +217,7 @@ void WiringReplicator::initializeBridge(
     sendQuery(QUEUE, queueName, sessionHandler);
     sendQuery(EXCHANGE, queueName, sessionHandler);
     sendQuery(BINDING, queueName, sessionHandler);
+    QPID_LOG(debug, "Activated wiring replicator")
 }
 
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable*
headers) {
@@ -244,13 +264,15 @@ void WiringReplicator::route(Deliverable
 }
 
 void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
+    QPID_LOG(critical, "FIXME doEventQueueDeclare " << values);
     string name = values[QNAME].asString();
     Variant::Map argsMap = values[ARGS].asMap();
-    if (values[DISP] == CREATED && isReplicated(argsMap)) {
-        QPID_LOG(debug, "Creating replicated queue " << name);
+    if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+        QPID_LOG(debug, "HA: Creating replicated queue " << name);
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-        if (!broker.createQueue(
+        std::pair<boost::shared_ptr<Queue>, bool> result =
+            broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
                 values[AUTODEL].asBool(),
@@ -258,11 +280,14 @@ void WiringReplicator::doEventQueueDecla
                 values[ALTEX].asString(),
                 args,
                 values[USER].asString(),
-                values[RHOST].asString()).second) {
+                values[RHOST].asString());
+        if (result.second) {
             // FIXME aconway 2011-11-22: should delete old queue and
             // re-create from event.
             // Events are always up to date, whereas responses may be
             // out of date.
+            startQueueReplicator(result.first);
+        } else {
             QPID_LOG(warning, "Replicated queue " << name << " already exists");
         }
     }
@@ -271,7 +296,7 @@ void WiringReplicator::doEventQueueDecla
 void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (queue && isReplicated(queue->getSettings())) {
+    if (queue && replicateLevel(queue->getSettings())) {
         QPID_LOG(debug, "Deleting replicated queue " << name);
         broker.deleteQueue(
             name,
@@ -282,7 +307,7 @@ void WiringReplicator::doEventQueueDelet
 
 void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
     Variant::Map argsMap(values[ARGS].asMap());
-    if (values[DISP] == CREATED && isReplicated(argsMap)) {
+    if (values[DISP] == CREATED && replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
@@ -305,7 +330,7 @@ void WiringReplicator::doEventExchangeDe
     string name = values[EXNAME].asString();
     try {
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
-        if (exchange && isReplicated(exchange->getArgs())) {
+        if (exchange && replicateLevel(exchange->getArgs())) {
             QPID_LOG(debug, "Deleting replicated exchange " << name);
             broker.deleteExchange(
                 name,
@@ -320,7 +345,7 @@ void WiringReplicator::doEventBind(Varia
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
         boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
         // We only replicated a binds for a replicated queue to replicated exchange.
-        if (isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings()))
{
+        if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings()))
{
             framing::FieldTable args;
             amqp_0_10::translate(values[ARGS].asMap(), args);
             string key = values[KEY].asString();
@@ -333,21 +358,28 @@ void WiringReplicator::doEventBind(Varia
 }
 
 void WiringReplicator::doResponseQueue(Variant::Map& values) {
+    QPID_LOG(critical, "FIXME doResponseQueue " << values);
     // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
     Variant::Map argsMap(values[ARGUMENTS].asMap());
-    if (!isReplicated(argsMap)) return;
+    QPID_LOG(critical, "FIXME doResponseQueue replevel " << replicateLevel(argsMap));
+    if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
+    string name(values[NAME].asString());
     QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() <<
" (in catch-up)");
-    if (!broker.createQueue(
-            values[NAME].asString(),
+    std::pair<boost::shared_ptr<Queue>, bool> result =
+        broker.createQueue(
+            name,
             values[DURABLE].asBool(),
             values[AUTODELETE].asBool(),
             0 /*i.e. no owner regardless of exclusivity on master*/,
             ""/*TODO: need to include alternate-exchange*/,
             args,
             ""/*TODO: who is the user?*/,
-            ""/*TODO: what should we use as connection id?*/).second) {
+            ""/*TODO: what should we use as connection id?*/);
+    if (result.second) {
+        startQueueReplicator(result.first);
+    } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
         QPID_LOG(warning, "Replicated queue " << values[NAME] << " already exists
(in catch-up)");
@@ -356,7 +388,7 @@ void WiringReplicator::doResponseQueue(V
 
 void WiringReplicator::doResponseExchange(Variant::Map& values) {
     Variant::Map argsMap(values[ARGUMENTS].asMap());
-    if (!isReplicated(argsMap)) return;
+    if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
     QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() <<
" (in catch-up)");
@@ -396,23 +428,21 @@ const std::string QUEUE_REF("queueRef");
 void WiringReplicator::doResponseBind(Variant::Map& values) {
     try {
         std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
-        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
-        if (!exchange) return;
-
         std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+        QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " <<
exName);
+        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
         boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
-        if (!queue) return;
+        QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to
"  << exchange.get());
+        // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
 
-        // We only replicated a bind for a replicated queue to replicated exchange.
-        // FIXME aconway 2011-11-22: do we always log binds between replicated ex/q
-        // or do we consider the bind arguments as well?
-        if (exchange && queue &&
-            isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings()))
+        // Automatically replicate exchange if queue and exchange are replicated
+        if (exchange && replicateLevel(exchange->getArgs()) &&
+            queue && replicateLevel(queue->getSettings()))
         {
             framing::FieldTable args;
             amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
             string key = values[KEY].asString();
-            QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName()
+            QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
                      << " queue=" << queue->getName()
                      << " key=" << key);
             exchange->bind(queue, key, &args);
@@ -420,6 +450,15 @@ void WiringReplicator::doResponseBind(Va
     } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or
exchange.
 }
 
+void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
+    QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() <<
" " << queue->getSettings());
+    if (replicateLevel(queue->getSettings()) == RL_ALL) {
+        QPID_LOG(critical, "FIXME startQueueReplicator starting");
+        boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+        broker.getExchanges().registerExchange(qr);
+    }
+}
+
 bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*)
{ return false; }
 bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*)
{ return false; }
 bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const
framing::FieldTable* const) { return false; }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.h Fri Nov 25 21:52:16 2011
@@ -68,7 +68,7 @@ class WiringReplicator : public broker::
     void doResponseBind(types::Variant::Map& values);
 
   private:
-    void startQueueReplicator(const std::string& name);
+    void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
     broker::Broker& broker;
     boost::shared_ptr<broker::Link> link;

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py Fri Nov 25 21:52:16 2011
@@ -515,6 +515,12 @@ class BrokerTest(TestCase):
         actual_contents = self.browse(session, queue, timeout)
         self.assertEqual(expect_contents, actual_contents)
 
+    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+        """Wait up to timeout for contents of queue to match expect_contents"""
+        def test(): return self.browse(session, queue, 0) == expect_contents
+        retry(test, timeout, delay)
+        self.assertEqual(expect_contents, self.browse(session, queue, 0))
+
 def join(thread, timeout=10):
     thread.join(timeout)
     if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1206351&r1=1206350&r2=1206351&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Fri Nov 25 21:52:16 2011
@@ -57,50 +57,52 @@ class ShortTests(BrokerTest):
 
     def assert_missing(self,session, address):
         try:
-            session.receiver(a)
+            session.receiver(address)
             self.fail("Should not have been replicated: %s"%(address))
         except NotFound: pass
 
-    def test_replicate_wiring(self):
-        queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"
-        exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"
+    def test_replication(self):
+        def queue(name, replicate):
+            return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name,
replicate)
+
+        def exchange(name, replicate, bindq):
+            return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+        def setup(p, prefix):
+            """Create config, send messages on the primary p"""
+            p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+            p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+            p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+            p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+            p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+            # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
+            p.sender(queue(prefix+"x", "wiring"))
+
+        def verify(b, prefix):
+            """Verify setup was replicated to backup b"""
+            # FIXME aconway 2011-11-21: wait for wiring to replicate.
+            self.wait(b, prefix+"x");
+            # Verify backup
+            # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
+            self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+            self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+            self.assert_missing(b, prefix+"q3")
+            b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+            self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+            b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
+            self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
-        # Create some wiring before starting the backup, to test catch-up
+        # Create config, send messages before starting the backup, to test catch-up replication.
         primary = self.ha_broker(name="primary")
         p = primary.connect().session()
-        p.sender(queue%("q1", "all")).send(Message("1"))
-        p.sender(queue%("q2", "wiring")).send(Message("2"))
-        p.sender(queue%("q3", "none")).send(Message("3"))
-        p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
-
-        # Create some after starting backup, test steady-state replication
+        setup(p, "1")
+        # Start the backup
         backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
         b = backup.connect().session()
-        # FIXME aconway 2011-11-21: need to wait for backup to be ready to test event replication
-        for a in ["q1", "q2", "e1"]: self.wait(b,a)
-        p.sender(queue%("q11", "all")).send(Message("11"))
-        p.sender(queue%("q12", "wiring")).send(Message("12"))
-        p.sender(queue%("q13", "none")).send(Message("13"))
-        p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14"))
-
-        # Verify replication
-        # FIXME aconway 2011-11-18: We should kill primary here and fail over.
-        for a in ["q11", "q12", "e11"]: self.wait(b,a)
-        # FIXME aconway 2011-11-18: replicate messages
-#         self.assert_browse(b, "q11", ["11", "14", "e11"])
-#         self.assert_browse(b, "q12", []) # wiring only
-#         self.assert_missing(b,"q13")
-        b.sender("e11").send(Message("e11")) # Verify bind
-        self.assert_browse(b, "q12", ["e11"])
-
-        for a in ["q1", "q2", "e1"]: self.wait(b,a)
-        # FIXME aconway 2011-11-18: replicate messages
-#         self.assert_browse(b, "q1", ["1", "4", "e1"])
-#         self.assert_browse(b, "q2", []) # wiring only
-#         self.assert_missing(b,"q3")
-        b.sender("e1").send(Message("e1")) # Verify bind
-        self.assert_browse(b, "q2", ["e1"])
+        verify(b, "1")
 
+        # Create config, send messages after starting the backup, to test steady-state replication.
+        setup(p, "2")
+        verify(b, "2")
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message