Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AC1D598AF for ; Fri, 2 Dec 2011 21:03:52 +0000 (UTC) Received: (qmail 3572 invoked by uid 500); 2 Dec 2011 21:03:52 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 3556 invoked by uid 500); 2 Dec 2011 21:03:52 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 3544 invoked by uid 99); 2 Dec 2011 21:03:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2011 21:03:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2011 21:03:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 328B423888E7 for ; Fri, 2 Dec 2011 21:03:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1209690 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/broker/ qpid/ha/ tests/ Date: Fri, 02 Dec 2011 21:03:24 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111202210325.328B423888E7@eris.apache.org> Author: aconway Date: Fri Dec 2 21:03:23 2011 New Revision: 1209690 URL: http://svn.apache.org/viewvc?rev=1209690&view=rev Log: QPID-3603: Fix replication of dequeues. - Set acquire=false when creating a ReplicatingSubscription. - Cleaned up string literals & other cosmetic improvemets. - Consistent find/get for broker::QueueRegistry and ExchangeRegistry. Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Dec 2 21:03:23 2011 @@ -87,12 +87,19 @@ void ExchangeRegistry::destroy(const str } } -Exchange::shared_ptr ExchangeRegistry::get(const string& name){ +Exchange::shared_ptr ExchangeRegistry::find(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) - throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name)); - return i->second; + return Exchange::shared_ptr(); + else + return i->second; +} + +Exchange::shared_ptr ExchangeRegistry::get(const string& name) { + Exchange::shared_ptr ex = find(name); + if (!ex) throw framing::NotFoundException(QPID_MSG("Exchange not found: "< find(const std::string& name); + + /** + * Get the named exchange. Throw exception if not found. + */ + QPID_BROKER_EXTERN boost::shared_ptr get(const std::string& name); + + + /** * Register the manageable parent for declared exchanges */ void setParent (management::Manageable* _parent) { parent = _parent; } Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Dec 2 21:03:23 2011 @@ -23,6 +23,7 @@ #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" #include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include #include @@ -84,7 +85,6 @@ void QueueRegistry::destroy (const strin Queue::shared_ptr QueueRegistry::find(const string& name){ RWlock::ScopedRlock locker(lock); QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { return Queue::shared_ptr(); } else { @@ -92,6 +92,12 @@ Queue::shared_ptr QueueRegistry::find(co } } +Queue::shared_ptr QueueRegistry::get(const string& name) { + Queue::shared_ptr q = find(name); + if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "< find(const std::string& name); /** + * Get the named queue. Throw exception if not found. + */ + QPID_BROKER_EXTERN boost::shared_ptr get(const std::string& name); + + /** * Generate unique queue name. */ std::string generateName(); Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Dec 2 21:03:23 2011 @@ -35,12 +35,16 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); +const std::string TYPE_NAME("qpid.queue-replicator"); } namespace qpid { namespace ha { using namespace broker; +// FIXME aconway 2011-12-02: separate file for string constantS? +const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); + QueueReplicator::QueueReplicator(boost::shared_ptr q, boost::shared_ptr l) : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? queue(q), link(l), current(queue->getPosition()) @@ -72,9 +76,7 @@ void QueueReplicator::initializeBridge(B framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; - // FIXME aconway 2011-11-28: string constants. settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - // FIXME aconway 2011-11-28: inconsistent use of _ vs. - settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); qpid::framing::SequenceNumber oldest; if (queue->getOldest(oldest)) @@ -86,15 +88,9 @@ void QueueReplicator::initializeBridge(B QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest); } - -namespace { -const std::string DEQUEUE_EVENT("dequeue-event"); -const std::string REPLICATOR("qpid.replicator-"); -} - void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) { - if (key == DEQUEUE_EVENT) { + if (key == DEQUEUE_EVENT_KEY) { std::string content; msg.getMessage().getFrames().getContent(content); qpid::framing::Buffer buffer(const_cast(content.c_str()), content.size()); @@ -115,6 +111,7 @@ void QueueReplicator::route(Deliverable& QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message)); } else { // FIXME aconway 2011-11-29: error handling + // Is this an error? Will happen if queue has initial dequeues. QPID_LOG(error, "HA: Unable to dequeue message at " << QueuePos(queue.get(), *i)); } @@ -136,10 +133,6 @@ void QueueReplicator::route(Deliverable& bool QueueReplicator::bind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const qpid::framing::FieldTable* const) { return false; } - -// FIXME aconway 2011-11-28: rationalise string constants. -static const std::string TYPE_NAME("qpid.queue-replicator"); - std::string QueueReplicator::getType() const { return TYPE_NAME; } }} // namespace qpid::broker Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Dec 2 21:03:23 2011 @@ -49,6 +49,8 @@ namespace ha { class QueueReplicator : public broker::Exchange { public: + static const std::string DEQUEUE_EVENT_KEY; + QueueReplicator(boost::shared_ptr q, boost::shared_ptr l); ~QueueReplicator(); std::string getType() const; Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Dec 2 21:03:23 2011 @@ -36,11 +36,13 @@ using namespace std; // FIXME aconway 2011-11-28: review all arugment names, prefixes etc. // Do we want a common HA prefix? const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); -const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number"); -const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number"); +const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); +const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); +namespace { const string DOLLAR("$"); const string INTERNAL("-internal"); +} // namespace class ReplicationStateInitialiser { @@ -80,15 +82,23 @@ ReplicatingSubscription::Factory::create const string& name, Queue::shared_ptr queue, bool ack, - bool acquire, + bool /*acquire*/, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments ) { - return boost::shared_ptr( - new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + boost::shared_ptr rs; + if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { + // FIXME aconway 2011-12-01: ignoring acquire param and setting acquire + // false. Should this be done in the caller? Remove from ctor parameters. + rs.reset(new ReplicatingSubscription( + parent, name, queue, ack, false, exclusive, tag, + resumeId, resumeTtl, arguments)); + queue->addObserver(rs); + } + return rs; } ReplicatingSubscription::ReplicatingSubscription( @@ -108,12 +118,11 @@ ReplicatingSubscription::ReplicatingSubs consumer(new DelegatingConsumer(*this)) { QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); - // FIXME aconway 2011-11-25: string constants. - if (arguments.isSet("qpid.high_sequence_number")) { - qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number"); + if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) { + qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER); qpid::framing::SequenceNumber lwm; - if (arguments.isSet("qpid.low_sequence_number")) { - lwm = arguments.getAsInt("qpid.low_sequence_number"); + if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) { + lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER); } else { lwm = hwm; } @@ -159,6 +168,7 @@ void ReplicatingSubscription::enqueued(c m.payload->getIngressCompletion().startCompleter(); } +// Called with lock held. void ReplicatingSubscription::generateDequeueEvent() { string buf(range.encodedSize(),'\0'); @@ -186,11 +196,14 @@ void ReplicatingSubscription::generateDe event->getFrames().append(content); DeliveryProperties* props = event->getFrames().getHeaders()->get(true); - props->setRoutingKey("dequeue-event"); - + props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY); events->deliver(event); } +// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this? +// If a queue is drained with no new messages coming on +// will the messages be dequeued on the backup? + //called after the message has been removed from the deque and under //the message lock in the queue void ReplicatingSubscription::dequeued(const QueuedMessage& m) Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Dec 2 21:03:23 2011 @@ -22,6 +22,7 @@ * */ +#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY #include "qpid/broker/SemanticState.h" #include "qpid/broker/QueueObserver.h" #include "qpid/broker/ConsumerFactory.h" Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Dec 2 21:03:23 2011 @@ -109,7 +109,6 @@ template bool match(Variant::M return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -// FIXME aconway 2011-11-24: this should be a class. enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; const string S_NONE="none"; const string S_WIRING="wiring"; @@ -216,6 +215,7 @@ void WiringReplicator::initializeBridge( QPID_LOG(debug, "HA: Activated wiring replicator") } +// FIXME aconway 2011-12-02: error handling in route. Be forging but log warnings? void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { Variant::List list; try { @@ -329,7 +329,7 @@ void WiringReplicator::doEventExchangeDe void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); try { - boost::shared_ptr exchange = broker.getExchanges().get(name); + boost::shared_ptr exchange = broker.getExchanges().find(name); if (exchange && replicateLevel(exchange->getArgs())) { QPID_LOG(debug, "HA: Deleting exchange:" << name); broker.deleteExchange( @@ -341,20 +341,23 @@ void WiringReplicator::doEventExchangeDe } void WiringReplicator::doEventBind(Variant::Map& values) { - try { - boost::shared_ptr exchange = broker.getExchanges().get(values[EXNAME].asString()); - boost::shared_ptr queue = broker.getQueues().find(values[QNAME].asString()); - // We only replicated a binds for a replicated queue to replicated exchange. - if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) { - framing::FieldTable args; - amqp_0_10::translate(values[ARGS].asMap(), args); - string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); - exchange->bind(queue, key, &args); - } - } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange. + boost::shared_ptr exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate binds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } } void WiringReplicator::doResponseQueue(Variant::Map& values) { @@ -424,26 +427,24 @@ const std::string QUEUE_REF("queueRef"); } // namespace void WiringReplicator::doResponseBind(Variant::Map& values) { - try { - std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); - std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - boost::shared_ptr exchange = broker.getExchanges().get(exName); - boost::shared_ptr queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. - - // Automatically replicate exchange if queue and exchange are replicated - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) - { - framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); - exchange->bind(queue, key, &args); - } - } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr exchange = broker.getExchanges().find(exName); + boost::shared_ptr queue = broker.getQueues().find(qName); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. + + // Automatically replicate binding if queue and exchange exist and are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } } void WiringReplicator::startQueueReplicator(const boost::shared_ptr& queue) { Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1209690&r1=1209689&r2=1209690&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Fri Dec 2 21:03:23 2011 @@ -62,7 +62,11 @@ class ShortTests(BrokerTest): return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) def setup(p, prefix): """Create config, send messages on the primary p""" - p.sender(queue(prefix+"q1", "all")).send(Message("1")) + s = p.sender(queue(prefix+"q1", "all")) + for m in ["a", "b", "1"]: s.send(Message(m)) + # Test replication of dequeue + self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") + p.acknowledge() p.sender(queue(prefix+"q2", "wiring")).send(Message("2")) p.sender(queue(prefix+"q3", "none")).send(Message("3")) p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) @@ -70,13 +74,18 @@ class ShortTests(BrokerTest): # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. p.sender(queue(prefix+"x", "wiring")) - def verify(b, prefix): + def verify(b, prefix, p): """Verify setup was replicated to backup b""" + # FIXME aconway 2011-11-21: wait for wiring to replicate. self.wait(b, prefix+"x"); - # Verify backup # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication. - self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) + self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) + + # FIXME aconway 2011-12-02: + self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") + p.acknowledge() + self.assert_browse_retry(b, prefix+"q2", []) # wiring only self.assert_missing(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all @@ -84,18 +93,18 @@ class ShortTests(BrokerTest): b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) - # Create config, send messages before starting the backup, to test catch-up replication. primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() + # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1") - # Start the backup backup = self.ha_broker(name="backup", broker_url=primary.host_port()) - b = backup.connect().session() - verify(b, "1") - # Create config, send messages after starting the backup, to test steady-state replication. setup(p, "2") - verify(b, "2") + + # Verify the data on the backup + b = backup.connect().session() + verify(b, "1", p) + verify(b, "2", p) if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org