Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BA7E07A98 for ; Wed, 21 Dec 2011 22:35:14 +0000 (UTC) Received: (qmail 82092 invoked by uid 500); 21 Dec 2011 22:35:14 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 82072 invoked by uid 500); 21 Dec 2011 22:35:14 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 82065 invoked by uid 99); 21 Dec 2011 22:35:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Dec 2011 22:35:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Dec 2011 22:35:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BD1CF238897D for ; Wed, 21 Dec 2011 22:34:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1221920 - in /qpid/branches/qpid-3603/qpid/cpp/src: ./ qpid/broker/ qpid/ha/ tests/ Date: Wed, 21 Dec 2011 22:34:51 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111221223451.BD1CF238897D@eris.apache.org> Author: aconway Date: Wed Dec 21 22:34:50 2011 New Revision: 1221920 URL: http://svn.apache.org/viewvc?rev=1221920&view=rev Log: QPID-3603: Clean up HA log messages. - Reduce verbosity, drop unknown event messages. - Lots of clarifications - Fix minor test bug in ha_tests.py. Removed: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Logging.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Logging.h Modified: qpid/branches/qpid-3603/qpid/cpp/src/ha.mk qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Modified: qpid/branches/qpid-3603/qpid/cpp/src/ha.mk URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/ha.mk?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/ha.mk (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/ha.mk Wed Dec 21 22:34:50 2011 @@ -28,8 +28,6 @@ ha_la_SOURCES = \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ - qpid/ha/Logging.h \ - qpid/ha/Logging.cpp \ qpid/ha/Settings.h \ qpid/ha/QueueReplicator.h \ qpid/ha/QueueReplicator.cpp \ Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Wed Dec 21 22:34:50 2011 @@ -1323,7 +1323,7 @@ void Queue::query(qpid::types::Variant:: void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; - QPID_LOG(info, "Set position to " << sequence << " on " << getName()); + QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed Dec 21 22:34:50 2011 @@ -46,7 +46,7 @@ Url url(const std::string& s, const std: } // namespace HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : broker(b), + : broker(b), clientUrl(url(s.clientUrl, "ha-client-url")), brokerUrl(url(s.brokerUrl, "ha-broker-url")), mgmtObject(0) @@ -59,17 +59,17 @@ HaBroker::HaBroker(broker::Broker& b, co mgmtObject->set_status("solo"); ma->addObject(mgmtObject); } - QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl - << " broker-url=" << brokerUrl); // FIXME aconway 2011-11-22: temporary hack to identify primary. - if (s.brokerUrl != "primary") - backup.reset(new Backup(broker, s)); + bool isPrimary = (s.brokerUrl == "primary"); + QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup") + << " initialized: client-url=" << clientUrl + << " broker-url=" << brokerUrl); + if (!isPrimary) backup.reset(new Backup(broker, s)); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr( new ReplicatingSubscription::Factory())); } - HaBroker::~HaBroker() {} Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Dec 21 22:34:50 2011 @@ -21,7 +21,6 @@ #include "QueueReplicator.h" #include "ReplicatingSubscription.h" -#include "Logging.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -32,6 +31,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include +#include namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); @@ -47,12 +47,19 @@ using namespace framing; const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); +std::string QueueReplicator::replicatorName(const std::string& queueName) { + return QPID_REPLICATOR_ + queueName; +} + +std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) { + return o << "HA: Backup queue " << qr.queue->getName() << ": "; +} + QueueReplicator::QueueReplicator(boost::shared_ptr q, boost::shared_ptr l) - : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? - queue(q), link(l) + : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); - // Declare the replicator bridge. + QPID_LOG(info, *this << "Created, settings: " << q->getSettings()); + queue->getBroker()->getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -69,12 +76,15 @@ QueueReplicator::QueueReplicator(boost:: ); } -QueueReplicator::~QueueReplicator() {} +QueueReplicator::~QueueReplicator() { + // FIXME aconway 2011-12-21: causes race condition? Restore. +// queue->getBroker()->getLinks().destroy( +// link->getHost(), link->getPort(), queue->getName(), getName(), string()); +} -// NB: This is called back ina broker connection thread when the -// bridge is created. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - // No lock needed, no mutable member variables are used. +// Called in a broker connection thread when the bridge is created. +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) +{ framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; @@ -91,11 +101,12 @@ void QueueReplicator::initializeBridge(B queue->setPosition(0); settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + // TODO aconway 2011-12-19: optimize. settings.setInt(QPID_SYNC_FREQUENCY, 1); peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest); } namespace { @@ -115,34 +126,37 @@ void QueueReplicator::dequeue(SequenceNu QueuedMessage message; if (queue->acquireMessageAt(n, message)) { queue->dequeue(0, message); - QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message)); + QPID_LOG(trace, *this << "Dequeued message "<< message.position); } } } -void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable* /*args*/) +// Called in connection thread of the queues bridge to primary. +void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) { sys::Mutex::ScopedLock l(lock); if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent(msg.getMessage()); - QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues); + QPID_LOG(trace, *this << "Received dequeues: " << dequeues); //TODO: should be able to optimise the following for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) dequeue(*i, l); } else if (key == POSITION_EVENT_KEY) { SequenceNumber position = decodeContent(msg.getMessage()); + QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition() + << " to " << position); assert(queue->getPosition() <= position); //TODO aconway 2011-12-14: Optimize this? for (SequenceNumber i = queue->getPosition(); i < position; ++i) dequeue(i,l); queue->setPosition(position); - QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), queue->getPosition())); } else { - QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), queue->getPosition()+1)); msg.deliverTo(queue); + QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition()); } } +// Unused Exchange methods. bool QueueReplicator::bind(boost::shared_ptr, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const FieldTable* const) { return false; } Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Wed Dec 21 22:34:50 2011 @@ -23,6 +23,7 @@ */ #include "qpid/broker/Exchange.h" #include "qpid/framing/SequenceSet.h" +#include namespace qpid { @@ -44,16 +45,18 @@ namespace ha { * Creates a ReplicatingSubscription on the primary by passing special * arguments to the consume command. * - * THREAD SAFE: Called in arbitrary connection threads. + * THREAD UNSAFE: Only called in the connection thread of the source queue. */ class QueueReplicator : public broker::Exchange { public: static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; + static std::string replicatorName(const std::string& queueName); QueueReplicator(boost::shared_ptr q, boost::shared_ptr l); ~QueueReplicator(); + std::string getType() const; bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*); @@ -67,6 +70,8 @@ class QueueReplicator : public broker::E sys::Mutex lock; boost::shared_ptr queue; boost::shared_ptr link; + + friend std::ostream& operator<<(std::ostream&, const QueueReplicator&); }; }} // namespace qpid::ha Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Wed Dec 21 22:34:50 2011 @@ -20,7 +20,6 @@ */ #include "ReplicatingSubscription.h" -#include "Logging.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" @@ -43,6 +42,13 @@ const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace + +ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) { + string url = rs.parent->getSession().getConnection().getUrl(); + string qname= rs.getQueue()->getName(); + return o << "HA: Primary: " << qname << "(" << url << "):"; +} + string mask(const string& in) { return DOLLAR + in + INTERNAL; @@ -96,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubs // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, "HA: Started " << *this << " subscription " << name); + QPID_LOG(debug, *this << "Created subscription " << name); // Note that broker::Queue::getPosition() returns the sequence // number that will be assigned to the next message *minus 1*. @@ -125,36 +131,39 @@ bool ReplicatingSubscription::deliver(Qu if (position - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. SequenceNumber send(position); - // Send the position before m was enqueued. - sendPositionEvent(--send, l); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + QPID_LOG(trace, *this << "Sending position " << send + << ", was " << backupPosition); } backupPosition = position; } - QPID_LOG(trace, "HA: Replicating " << QueuePos(m) << " to " << *this); + QPID_LOG(trace, *this << "Replicating message " << m.position); } return ConsumerImpl::deliver(m); } +ReplicatingSubscription::~ReplicatingSubscription() {} + +// Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - QPID_LOG(debug, "HA: Cancelled " << *this); + QPID_LOG(debug, *this <<"Cancelled"); getQueue()->removeObserver(boost::dynamic_pointer_cast(shared_from_this())); } -ReplicatingSubscription::~ReplicatingSubscription() {} - -//called before we get notified of the message being available and -//under the message lock in the queue +// Called before we get notified of the message being available and +// under the message lock in the queue. Called in arbitrary connection thread. void ReplicatingSubscription::enqueued(const QueuedMessage& m) { //delay completion m.payload->getIngressCompletion().startCompleter(); } -// Called with lock held. +// Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, "HA: Sending dequeues " << dequeues << " to " << *this); + QPID_LOG(trace, *this << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -163,12 +172,10 @@ void ReplicatingSubscription::sendDequeu sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } -// Called with lock held. +// Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { - QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), position) - << " on " << *this); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); @@ -209,21 +216,24 @@ void ReplicatingSubscription::sendEvent( } // Called after the message has been removed from the deque and under -// the message lock in the queue. +// the message lock in the queue. Called in arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { + QPID_LOG(trace, *this << "Dequeued message " << m.position); { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); - QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << *this); } notify(); // Ensure a call to doDispatch + // FIXME aconway 2011-12-20: not thread safe to access position here, + // we're not in the dispatch thread. if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early on " << *this); + QPID_LOG(trace, *this << "Completed message " << m.position << " early"); } } +// Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() { { @@ -235,19 +245,10 @@ bool ReplicatingSubscription::doDispatch ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) -{ - return delegate.deliver(m); -} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr msg) { return delegate.accept(msg); } OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - -ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) { - string url = rs.parent->getSession().getConnection().getUrl(); - return o << rs.getQueue()->getName() << " backup on " << url; -} - }} // namespace qpid::ha Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Wed Dec 21 22:34:50 2011 @@ -49,7 +49,7 @@ namespace ha { * Runs on the primary. Delays completion of messages till the backup * has acknowledged, informs backup of locally dequeued messages. * - * THREAD SAFE: Used as a consume in subscription's connection + * THREAD SAFE: Used as a consumer in subscription's connection * thread, and as a QueueObserver in arbitrary connection threads. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Wed Dec 21 22:34:50 2011 @@ -229,8 +229,6 @@ void WiringReplicator::route(Deliverable Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - QPID_LOG(trace, "HA: Backup received event: schema=" << schema - << " values=" << values); if (match(schema)) doEventQueueDeclare(values); else if (match(schema)) doEventQueueDelete(values); else if (match(schema)) doEventExchangeDeclare(values); @@ -244,16 +242,15 @@ void WiringReplicator::route(Deliverable Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - QPID_LOG(trace, "HA: Backup received response type=" << type - << " values=" << values); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); + else QPID_LOG(error, "HA: Backup received unknown response: type=" << type + << " values=" << values); + // FIXME aconway 2011-12-06: handle all relevant response types. } - } else { - QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers); - } + } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); } catch (const std::exception& e) { QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); } @@ -280,11 +277,11 @@ void WiringReplicator::doEventQueueDecla // re-create from event. // Events are always up to date, whereas responses may be // out of date. - QPID_LOG(debug, "HA: Created backup queue from event: " << name); + QPID_LOG(debug, "HA: Backup created queue: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? - QPID_LOG(warning, "HA: Queue already exists on backup: " << name); + QPID_LOG(warning, "HA: Backup queue already exists: " << name); } } } @@ -293,11 +290,14 @@ void WiringReplicator::doEventQueueDelet string name = values[QNAME].asString(); boost::shared_ptr queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Deleting backup queue from event: " << name); + QPID_LOG(debug, "HA: Backup deleting queue: " << name); broker.deleteQueue( name, values[USER].asString(), values[RHOST].asString()); + // FIXME aconway 2011-12-21: casuses race conditions? Restore. +// // Also delete the QueueReplicator exchange for this queue. +// broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); } } @@ -316,11 +316,11 @@ void WiringReplicator::doEventExchangeDe values[USER].asString(), values[RHOST].asString()).second) { - QPID_LOG(debug, "HA: created backup exchange from event: " << name); + QPID_LOG(debug, "HA: Backup created exchange: " << name); } else { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Exchange already exists on backup: " << name); + QPID_LOG(warning, "HA: Backup exchange already exists: " << name); } } } @@ -330,7 +330,7 @@ void WiringReplicator::doEventExchangeDe try { boost::shared_ptr exchange = broker.getExchanges().find(name); if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Deleting backup exchange:" << name); + QPID_LOG(debug, "HA: Backup deleting exchange:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -352,7 +352,7 @@ void WiringReplicator::doEventBind(Varia framing::FieldTable args; amqp_0_10::translate(values[ARGS].asMap(), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); exchange->bind(queue, key, &args); @@ -377,12 +377,12 @@ void WiringReplicator::doResponseQueue(V ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]); + QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "HA: Queue already exists on backup: " << name); + QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); } } @@ -400,9 +400,9 @@ void WiringReplicator::doResponseExchang ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]); + QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); } else { - QPID_LOG(warning, "HA: Exchange already exists on backup: " << values[QNAME]); + QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); } } @@ -442,7 +442,7 @@ void WiringReplicator::doResponseBind(Va amqp_0_10::translate(values[ARGUMENTS].asMap(), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1221920&r1=1221919&r2=1221920&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Wed Dec 21 22:34:50 2011 @@ -108,6 +108,7 @@ class ShortTests(BrokerTest): # Test a series of messages, enqueue all then dequeue all. s = p.sender(queue("foo","all")) + self.wait(b, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) self.assert_browse_retry(p, "foo", msgs) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org