Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A20B10D8A for ; Mon, 17 Jun 2013 14:19:41 +0000 (UTC) Received: (qmail 74727 invoked by uid 500); 17 Jun 2013 14:19:41 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 74646 invoked by uid 500); 17 Jun 2013 14:19:37 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 74625 invoked by uid 99); 17 Jun 2013 14:19:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jun 2013 14:19:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jun 2013 14:19:33 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7906C23889BB; Mon, 17 Jun 2013 14:19:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1493771 [2/2] - in /qpid/trunk/qpid/cpp: include/qpid/types/ src/ src/qpid/broker/ src/qpid/ha/ src/qpid/types/ src/tests/ Date: Mon, 17 Jun 2013 14:19:12 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130617141914.7906C23889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Mon Jun 17 14:19:10 2013 @@ -25,8 +25,9 @@ #include "ReplicationTest.h" #include "BrokerInfo.h" #include "types.h" +#include "hash.h" +#include "qpid/sys/unordered_map.h" #include -#include namespace qpid { @@ -58,17 +59,12 @@ class RemoteBackup RemoteBackup(const BrokerInfo&, broker::Connection*); ~RemoteBackup(); - /** Set all queues in the registry as catch-up queues. - *@createGuards if true create guards also, if false guards are created on demand. - */ - void setCatchupQueues(broker::QueueRegistry&, bool createGuards); - /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); /** Is the remote backup connected? */ void setConnection(broker::Connection* c) { connection = c; } - bool isConnected() const { return connection; } + broker::Connection* getConnection() const { return connection; } /** ReplicatingSubscription associated with queue is ready. * Note: may set isReady() @@ -90,18 +86,26 @@ class RemoteBackup /**Cancel all queue guards, called if we are timed out. */ void cancel(); + /** Set a catch-up queue for this backup. + *@createGuard if true create a guard immediately. + */ + void catchupQueue(const QueuePtr&, bool createGuard); + BrokerInfo getBrokerInfo() const { return brokerInfo; } + + void startCatchup() { started = true; } + private: - typedef std::map GuardMap; + typedef qpid::sys::unordered_map > GuardMap; typedef std::set QueueSet; - void catchupQueue(const QueuePtr&, bool createGuard); - std::string logPrefix; BrokerInfo brokerInfo; ReplicationTest replicationTest; GuardMap guards; QueueSet catchupQueues; + bool started; broker::Connection* connection; bool reportedReady; }; Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Jun 17 14:19:10 2013 @@ -20,12 +20,16 @@ */ #include "makeMessage.h" +#include "IdSetter.h" #include "QueueGuard.h" -#include "QueueRange.h" #include "QueueReplicator.h" +#include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "Primary.h" +#include "HaBroker.h" +#include "qpid/assert.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -35,6 +39,7 @@ #include "qpid/types/Uuid.h" #include + namespace qpid { namespace ha { @@ -45,53 +50,20 @@ using sys::Mutex; using broker::amqp_0_10::MessageTransfer; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); -const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); -const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front"); const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); +const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info"); -namespace { -const string DOLLAR("$"); -const string INTERNAL("-internal"); -} // namespace - -// Scan the queue for gaps and add them to the subscriptions dequed set. -class DequeueScanner -{ +class ReplicatingSubscription::QueueObserver : public broker::QueueObserver { public: - DequeueScanner( - ReplicatingSubscription& rs, - SequenceNumber front_, - SequenceNumber back_ // Inclusive - ) : subscription(rs), front(front_), back(back_) - { - assert(front <= back); - // INVARIANT deques have been added for positions <= at. - at = front - 1; - } - - void operator()(const Message& m) { - if (m.getSequence() >= front && m.getSequence() <= back) { - if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1); - at = m.getSequence(); - } - } - - // Must call after scanning the queue. - void finish() { - if (at < back) subscription.dequeued(at+1, back); - } - + QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {} + void enqueued(const broker::Message&) {} + void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); } + void acquired(const broker::Message&) {} + void requeued(const broker::Message&) {} private: - ReplicatingSubscription& subscription; - SequenceNumber front; - SequenceNumber back; - SequenceNumber at; + ReplicatingSubscription& rs; }; -string mask(const string& in) -{ - return DOLLAR + in + INTERNAL; -} /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr @@ -110,6 +82,7 @@ ReplicatingSubscription::Factory::create boost::shared_ptr rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( + haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); rs->initialize(); @@ -117,7 +90,15 @@ ReplicatingSubscription::Factory::create return rs; } +namespace { +void copyIf(boost::shared_ptr from, boost::shared_ptr& to) { + boost::shared_ptr result = boost::dynamic_pointer_cast(from); + if (result) to = result; +} +} // namespace + ReplicatingSubscription::ReplicatingSubscription( + HaBroker& hb, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -130,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubs const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - ready(false) + position(0), ready(false), cancelled(false), + haBroker(hb) { try { FieldTable ft; @@ -140,64 +122,57 @@ ReplicatingSubscription::ReplicatingSubs // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "Primary " << queue->getName() << "@" << info << ": "; + os << "Subscription to " << queue->getName() << " at " << info << ": "; logPrefix = os.str(); - // NOTE: Once the guard is attached we can have concurrent - // calls to dequeued so we need to lock use of this->dequeues. - // - // However we must attach the guard _before_ we scan for - // initial dequeues to be sure we don't miss any dequeues - // between the scan and attaching the guard. + // If this is a non-cluster standalone replication then we need to + // set up an IdSetter if there is not already one. + boost::shared_ptr idSetter; + queue->getMessageInterceptors().each( + boost::bind(©If, _1, boost::ref(idSetter))); + if (!idSetter) { + QPID_LOG(debug, logPrefix << "Standalone replication"); + queue->getMessageInterceptors().add( + boost::shared_ptr(new IdSetter(queue->getName(), 1))); + } + + // If there's already a guard (we are in failover) use it, else create one. if (Primary::get()) guard = Primary::get()->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); - guard->attach(*this); - QueueRange backup(arguments); // Remote backup range. - QueueRange backupOriginal(backup); - QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. - backupPosition = backup.back; - - // Sync backup and primary queues, don't send messages already on the backup - - if (backup.front > primary.front || // Missing messages at front - backup.back < primary.front || // No overlap - primary.empty() || backup.empty()) // Empty + // NOTE: Once the observer is attached we can have concurrent + // calls to dequeued so we need to lock use of this->dequeues. + // + // However we must attach the observer _before_ we snapshot for + // initial dequeues to be sure we don't miss any dequeues + // between the snapshot and attaching the observer. + observer.reset(new QueueObserver(*this)); + queue->addObserver(observer); + ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot(); + std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET); + ReplicationIdSet backup; + if (!backupStr.empty()) backup = decodeStr(backupStr); + + // Initial dequeues are messages on backup but not on primary. + ReplicationIdSet initDequeues = backup - primary; + QueuePosition front,back; + queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue { - // No useful overlap - erase backup and start from the beginning - if (!backup.empty()) dequeued(backup.front, backup.back); - position = primary.front-1; + sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() + dequeues += initDequeues; // Messages on backup that are not on primary. + skip = backup - initDequeues; // Messages already on the backup. + + // Queue front is moving but we know this subscriptions will start at a + // position >= front so if front is safe then position must be. + position = front; + + QPID_LOG(debug, logPrefix << "Subscribed: front " << front + << ", back " << back + << ", start " << position + << ", guarded " << guard->getFirst() + << ", on backup " << skip); + checkReady(l); } - else { // backup and primary do overlap. - // Remove messages from backup that are not in primary. - if (primary.back < backup.back) { - dequeued(primary.back+1, backup.back); // Trim excess messages at back - backup.back = primary.back; - } - if (backup.front < primary.front) { - dequeued(backup.front, primary.front-1); // Trim excess messages at front - backup.front = primary.front; - } - DequeueScanner scan(*this, backup.front, backup.back); - // FIXME aconway 2012-06-15: Optimize queue traversal, only in range. - queue->eachMessage(boost::ref(scan)); // Remove missing messages in between. - scan.finish(); - position = backup.back; - //move cursor to position - queue->seek(*this, position); - } - // NOTE: we are assuming that the messages that are on the backup are - // consistent with those on the primary. If the backup is a replica - // queue and hasn't been tampered with then that will be the case. - - QPID_LOG(debug, logPrefix << "Subscribed:" - << " backup:" << backupOriginal << " adjusted backup:" << backup - << " primary:" << primary - << " catch-up: " << position << "-" << primary.back - << "(" << primary.back-position << ")"); - - // Check if we are ready yet. - if (guard->subscriptionStart(position)) setReady(); } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Creation error: " << e.what() @@ -208,6 +183,7 @@ ReplicatingSubscription::ReplicatingSubs ReplicatingSubscription::~ReplicatingSubscription() {} + // Called in subscription's connection thread when the subscription is created. // Called separate from ctor because sending events requires // shared_from_this @@ -215,12 +191,9 @@ ReplicatingSubscription::~ReplicatingSub void ReplicatingSubscription::initialize() { try { Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. - - // Send initial dequeues and position to the backup. + // Send initial dequeues to the backup. // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); - sendPositionEvent(position, l); - backupPosition = position; } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Initialization error: " << e.what() @@ -229,53 +202,64 @@ void ReplicatingSubscription::initialize } } +// True if the next position for the ReplicatingSubscription is a guarded position. +bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { + return position+1 >= guard->getFirst(); +} + // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver( const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { + Mutex::ScopedLock l(lock); + ReplicationId id = m.getReplicationId(); + position = m.getSequence(); try { - QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); - { - Mutex::ScopedLock l(lock); - position = m.getSequence(); - - // m.getSequence() is the position of the new message on local queue. - // backupPosition is latest position on backup queue before enqueueing - if (m.getSequence() <= backupPosition) - throw Exception( - QPID_MSG(logPrefix << "Expected position > " << backupPosition - << " but got " << m.getSequence())); - if (m.getSequence() - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - // Send the position before message was enqueued. - sendPositionEvent(m.getSequence()-1, l); - } - // Backup will automatically advance by 1 on delivery of message. - backupPosition = m.getSequence(); + bool result = false; + if (skip.contains(id)) { + skip -= id; + guard->complete(id); // This will never be acknowledged. + result = false; + } + else { + QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m)); + // Only consider unguarded messages for ready status. + if (!ready && !isGuarded(l)) unacked += id; + sendIdEvent(id, l); + result = ConsumerImpl::deliver(c, m); } - return ConsumerImpl::deliver(c, m); + checkReady(l); + return result; } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() + QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m) << ": " << e.what()); throw; } } -void ReplicatingSubscription::setReady() { - { - Mutex::ScopedLock l(lock); - if (ready) return; +/** + *@param position: must be <= last position seen by subscription. + */ +void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { + if (!ready && isGuarded(l) && unacked.empty()) { ready = true; + sys::Mutex::ScopedUnlock u(lock); + // Notify Primary that a subscription is ready. + QPID_LOG(debug, logPrefix << "Caught up"); + if (Primary::get()) Primary::get()->readyReplica(*this); } - // Notify Primary that a subscription is ready. - QPID_LOG(debug, logPrefix << "Caught up"); - if (Primary::get()) Primary::get()->readyReplica(*this); } // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { + { + Mutex::ScopedLock l(lock); + if (cancelled) return; + cancelled = true; + } QPID_LOG(debug, logPrefix << "Cancelled"); + getQueue()->removeObserver(observer); guard->cancel(); ConsumerImpl::cancel(); } @@ -283,10 +267,15 @@ void ReplicatingSubscription::cancel() // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); - guard->complete(r.getMessageId()); - // If next message is protected by the guard then we are ready - if (r.getMessageId() >= guard->getRange().back) setReady(); + ReplicationId id = r.getReplicationId(); + QPID_LOG(trace, logPrefix << "Acknowledged " << + LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId())); + guard->complete(id); + { + Mutex::ScopedLock l(lock); + unacked -= id; + checkReady(l); + } ConsumerImpl::acknowledged(r); } @@ -295,59 +284,36 @@ void ReplicatingSubscription::sendDequeu { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); - string buf(dequeues.encodedSize(),'\0'); - framing::Buffer buffer(&buf[0], buf.size()); - dequeues.encode(buffer); + string buffer = encodeStr(dequeues); dequeues.clear(); - buffer.reset(); - { - Mutex::ScopedUnlock u(lock); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); - } + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } -// Called via QueueObserver::dequeued override on guard. // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. -void ReplicatingSubscription::dequeued(const Message& m) +void ReplicatingSubscription::dequeued(ReplicationId id) { - QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); + QPID_LOG(trace, logPrefix << "Dequeued ID " << id); { Mutex::ScopedLock l(lock); - dequeues.add(m.getSequence()); + dequeues.add(id); } notify(); // Ensure a call to doDispatch } -// Called during construction while scanning for initial dequeues. -void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) { - QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]"); - { - Mutex::ScopedLock l(lock); - dequeues.add(first,last); - } -} // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l) +void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l) { - if (pos == backupPosition) return; // No need to send. - QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition); - string buf(pos.encodedSize(),'\0'); - framing::Buffer buffer(&buf[0], buf.size()); - pos.encode(buffer); - buffer.reset(); - { - Mutex::ScopedUnlock u(lock); - sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); - } + sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l); } void ReplicatingSubscription::sendEvent(const std::string& key, - const framing::Buffer& buffer, + const std::string& buffer, Mutex::ScopedLock&) { + Mutex::ScopedUnlock u(lock); broker::Message message = makeMessage(buffer); MessageTransfer& transfer = MessageTransfer::get(message); DeliveryProperties* props = @@ -370,7 +336,6 @@ bool ReplicatingSubscription::doDispatch return ConsumerImpl::doDispatch(); } catch (const std::exception& e) { - // FIXME aconway 2012-10-05: detect queue deletion, no warning. QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); return false; } Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Jun 17 14:19:10 2013 @@ -43,6 +43,7 @@ class Buffer; namespace ha { class QueueGuard; +class HaBroker; /** * A susbcription that replicates to a remote backup. @@ -61,30 +62,36 @@ class QueueGuard; * * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer. * - * Lock Hierarchy: ReplicatingSubscription MUST NOT call QueueGuard with its - * lock held QueueGuard MAY call ReplicatingSubscription with its lock held. + * ReplicatingSubscription makes calls on QueueGuard, but not vice-versa. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl { public: typedef broker::SemanticState::ConsumerImpl ConsumerImpl; - struct Factory : public broker::ConsumerFactory { + class Factory : public broker::ConsumerFactory { + public: + Factory(HaBroker& hb) : haBroker(hb) {} + + HaBroker& getHaBroker() const { return haBroker; } + boost::shared_ptr create( broker::SemanticState* parent, const std::string& name, boost::shared_ptr , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + private: + HaBroker& haBroker; }; // Argument names for consume command. static const std::string QPID_REPLICATING_SUBSCRIPTION; - static const std::string QPID_BACK; - static const std::string QPID_FRONT; static const std::string QPID_BROKER_INFO; + static const std::string QPID_ID_SET; - ReplicatingSubscription(broker::SemanticState* parent, + ReplicatingSubscription(HaBroker& haBroker, + broker::SemanticState* parent, const std::string& name, boost::shared_ptr , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, @@ -92,12 +99,6 @@ class ReplicatingSubscription : public b ~ReplicatingSubscription(); - // Called via QueueGuard::dequeued. - //@return true if the message requires completion. - void dequeued(const broker::Message&); - - // Called during initial scan for dequeues. - void dequeued(framing::SequenceNumber first, framing::SequenceNumber last); // Consumer overrides. bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); @@ -121,19 +122,27 @@ class ReplicatingSubscription : public b bool doDispatch(); private: + class QueueObserver; + friend class QueueObserver; + std::string logPrefix; - framing::SequenceSet dequeues; - framing::SequenceNumber position; - framing::SequenceNumber backupPosition; + QueuePosition position; + ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. + ReplicationIdSet skip; // Messages already on backup will be skipped. + ReplicationIdSet unacked; // Replicated but un-acknowledged. bool ready; + bool cancelled; BrokerInfo info; boost::shared_ptr guard; + HaBroker& haBroker; + boost::shared_ptr observer; + bool isGuarded(sys::Mutex::ScopedLock&); + void dequeued(ReplicationId); void sendDequeueEvent(sys::Mutex::ScopedLock&); - void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&); - void setReady(); - void sendEvent(const std::string& key, const framing::Buffer&, - sys::Mutex::ScopedLock&); + void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&); + void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&); + void checkReady(sys::Mutex::ScopedLock&); friend struct Factory; }; Copied: qpid/trunk/qpid/cpp/src/qpid/ha/hash.h (from r1493713, qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/hash.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/hash.h&p1=qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h&r1=1493713&r2=1493771&rev=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/hash.h Mon Jun 17 14:19:10 2013 @@ -1,5 +1,5 @@ -#ifndef QPID_HA_MAKEMESSAGE_H -#define QPID_HA_MAKEMESSAGE_H +#ifndef QPID_HA_HASH_H +#define QPID_HA_HASH_H /* * @@ -22,22 +22,21 @@ * */ -#include "qpid/broker/Message.h" +#include namespace qpid { -namespace framing { -class Buffer; -} namespace ha { -/** - * Create internal messages used by HA components. - */ -broker::Message makeMessage( - const framing::Buffer& content, - const std::string& destination=std::string() -); +template struct TrivialHasher { + size_t operator()(T value) const { return static_cast(value); } +}; + +template struct SharedPtrHasher { + size_t operator()(const boost::shared_ptr& ptr) const { + return reinterpret_cast(ptr.get()); + } +}; }} // namespace qpid::ha -#endif /*!QPID_HA_MAKEMESSAGE_H*/ +#endif /*!QPID_HA_HASH_H*/ Modified: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp Mon Jun 17 14:19:10 2013 @@ -54,4 +54,9 @@ broker::Message makeMessage(const framin return broker::Message(transfer, 0); } +broker::Message makeMessage(const std::string& content, const std::string& destination) { + framing::Buffer buffer(const_cast(&content[0]), content.size()); + return makeMessage(buffer, destination); +} + }} // namespace qpid::ha Modified: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h Mon Jun 17 14:19:10 2013 @@ -23,6 +23,10 @@ */ #include "qpid/broker/Message.h" +#include "qpid/framing/Buffer.h" +#include + +/** Utilities for creating messages used by HA internally. */ namespace qpid { namespace framing { @@ -38,6 +42,25 @@ broker::Message makeMessage( const std::string& destination=std::string() ); +broker::Message makeMessage(const std::string& content, + const std::string& destination=std::string()); + +/** Encode value as a string. */ +template std::string encodeStr(const T& value) { + std::string encoded(value.encodedSize(), '\0'); + framing::Buffer buffer(&encoded[0], encoded.size()); + value.encode(buffer); + return encoded; +} + +/** Decode value from a string. */ +template T decodeStr(const std::string& encoded) { + framing::Buffer buffer(const_cast(&encoded[0]), encoded.size()); + T value; + value.decode(buffer); + return value; +} + }} // namespace qpid::ha #endif /*!QPID_HA_MAKEMESSAGE_H*/ Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp Mon Jun 17 14:19:10 2013 @@ -21,6 +21,8 @@ #include "types.h" #include "qpid/Msg.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" #include "qpid/Exception.h" #include #include @@ -83,4 +85,19 @@ ostream& operator<<(ostream& o, const Id return o; } +LogMessageId::LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) : + queue(q.getName()), position(pos), replicationId(id) {} + +LogMessageId::LogMessageId(const broker::Queue& q, const broker::Message& m) : + queue(q.getName()), position(m.getSequence()), replicationId(m.getReplicationId()) {} + +LogMessageId::LogMessageId(const std::string& q, const broker::Message& m) : + queue(q), position(m.getSequence()), replicationId(m.getReplicationId()) {} + +std::ostream& operator<<(std::ostream& o, const LogMessageId& m) { + return o << m.queue << "[" << m.position << "]=" << m.replicationId; +} + + + }} // namespace qpid::ha Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.h?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/types.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/types.h Mon Jun 17 14:19:10 2013 @@ -24,12 +24,20 @@ #include "qpid/types/Variant.h" #include "qpid/types/Uuid.h" +#include "qpid/framing/SequenceSet.h" + +#include + #include #include #include namespace qpid { +namespace broker { +class Message; +class Queue; +} namespace framing { class FieldTable; } @@ -106,5 +114,23 @@ class IdSet : public std::set QueuePtr; + LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id); + LogMessageId(const broker::Queue& q, const broker::Message& m); + LogMessageId(const std::string& q, const broker::Message& m); + const std::string& queue; + QueuePosition position; + ReplicationId replicationId; +}; +std::ostream& operator<<(std::ostream&, const LogMessageId&); + }} // qpid::ha #endif /*!QPID_HA_ENUM_H*/ Modified: qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp Mon Jun 17 14:19:10 2013 @@ -144,4 +144,12 @@ std::string Uuid::str() const return os.str(); } +size_t Uuid::hash() const { + std::size_t seed = 0; + for(size_t i = 0; i < SIZE; ++i) + seed ^= static_cast(bytes[i]) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + return seed; +} + + }} // namespace qpid::types Modified: qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Mon Jun 17 14:19:10 2013 @@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort) list records; for (list::iterator i = ids.begin(); i != ids.end(); i++) { - DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); + DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); r.setId(*i); records.push_back(r); } Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original) +++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Jun 17 14:19:10 2013 @@ -107,7 +107,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+ # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -177,7 +177,7 @@ acl allow all all self._status = self.ha_status() return self._status == status; except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( + assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%( self, status, self._status) def wait_queue(self, queue, timeout=1): Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Jun 17 14:19:10 2013 @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4, UUID @@ -100,7 +100,7 @@ class ReplicationTests(HaBrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) # Verify exchange with replicate=configuration - b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) + b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind. @@ -284,9 +284,9 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-ha backup.replicate(primary.host_port(), "q") - ps.send("a") + ps.send("a", timeout=1) backup.assert_browse_backup("q", ["a"]) - ps.send("b") + ps.send("b", timeout=1) backup.assert_browse_backup("q", ["a", "b"]) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() @@ -295,11 +295,11 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-config ps2 = pc.session().sender("q2;{create:always}") backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x") + ps2.send("x", timeout=1) backup.assert_browse_backup("q2", ["x"]) finally: l.restore() - def test_queue_replica_failover(self): + def test_standalone_queue_replica_failover(self): """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. @@ -319,6 +319,7 @@ class ReplicationTests(HaBrokerTest): backup.assert_browse_backup("q", ["a"]) ps.send("b") backup.assert_browse_backup("q", ["a", "b"]) + cluster[0].wait_status("ready") cluster.bounce(1) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() @@ -331,16 +332,20 @@ class ReplicationTests(HaBrokerTest): """Verify that we replicate to an LVQ correctly""" cluster = HaCluster(self, 2) s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") - def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) - for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: - send(*kv) - cluster[1].assert_browse_backup("lvq", ["b-1", "a-3", "c-2"]) - send("b","b-2") - cluster[1].assert_browse_backup("lvq", ["a-3", "c-2", "b-2"]) - send("c","c-3") - cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3"]) - send("d","d-1") - cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"]) + + def send(key,value,expect): + s.send(Message(content=value,properties={"lvq-key":key}), timeout=1) + cluster[1].assert_browse_backup("lvq", expect) + + send("a", "a-1", ["a-1"]) + send("b", "b-1", ["a-1", "b-1"]) + send("a", "a-2", ["b-1", "a-2"]) + send("a", "a-3", ["b-1", "a-3"]) + send("c", "c-1", ["b-1", "a-3", "c-1"]) + send("c", "c-2", ["b-1", "a-3", "c-2"]) + send("b", "b-2", ["a-3", "c-2", "b-2"]) + send("c", "c-3", ["a-3", "b-2", "c-3"]) + send("d", "d-1", ["a-3", "b-2", "c-3", "d-1"]) def test_ring(self): """Test replication with the ring queue policy""" @@ -416,8 +421,8 @@ class ReplicationTests(HaBrokerTest): def send(self, connection): """Send messages, then acquire one but don't acknowledge""" s = connection.session() - for m in range(10): s.sender(self.address).send(str(m)) - s.receiver(self.address).fetch() + for m in range(10): s.sender(self.address).send(str(m), timeout=1) + s.receiver(self.address, timeout=1).fetch() def verify(self, brokertest, backup): backup.assert_browse_backup(self.queue, self.expect, msg=self.queue) @@ -959,8 +964,7 @@ class LongTests(HaBrokerTest): for s in senders: s.sender.assert_running() for r in receivers: r.receiver.assert_running() checkpoint = [ r.received+100 for r in receivers ] - dead = None - victim = random.randint(0,2) + victim = random.choice([0,1,2,primary]) # Give the primary a better chance. if victim == primary: # Don't kill primary till it is active and the next # backup is ready, otherwise we can lose messages. @@ -984,13 +988,8 @@ class LongTests(HaBrokerTest): finally: for s in senders: s.stop() for r in receivers: r.stop() - unexpected_dead = [] - for i in xrange(3): - if not brokers[i].is_running() and i != dead: - unexpected_dead.append(i) - if brokers[i].is_running(): brokers.kill(i, False) - if unexpected_dead: - raise Exception("Brokers not running: %s"%unexpected_dead) + dead = filter(lambda i: not brokers[i].is_running(), xrange(3)) + if dead: raise Exception("Brokers not running: %s"%dead) def test_qmf_order(self): """QPID 4402: HA QMF events can be out of order. @@ -1066,7 +1065,7 @@ class RecoveryTests(HaBrokerTest): # Create a queue before the failure. s1 = cluster.connect(0).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") - for i in xrange(10): s1.send(str(i)) + for i in xrange(10): s1.send(str(i), timeout=0.1) # Kill primary and 2 backups cluster[3].wait_status("ready") @@ -1125,17 +1124,17 @@ class RecoveryTests(HaBrokerTest): cluster[0].wait_status("active") # Primary ready for b in cluster[1:3]: b.wait_status("ready") # Backups ready for i in [0,1]: cluster.kill(i, False) - cluster[2].promote() # New primary, backups will be 1 and 2 + cluster[2].promote() # New primary, expected backup will 1 cluster[2].wait_status("recovering") # Should not go active till the expected backup connects or times out. self.assertEqual(cluster[2].ha_status(), "recovering") - # Messages should be held expected backup times out + # Messages should be held till expected backup times out s = cluster[2].connect().session().sender("q;{create:always}") - for i in xrange(100): s.send(str(i), sync=False) + s.send("foo", sync=False) # Verify message held initially. try: s.sync(timeout=.01); self.fail("Expected Timeout exception") except Timeout: pass - s.sync(timeout=1) # And released after the timeout. + s.sync(timeout=1) # And released after the timeout. self.assertEqual(cluster[2].ha_status(), "active") def test_join_ready_cluster(self): Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1493771&r1=1493770&r2=1493771&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark (original) +++ qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark Mon Jun 17 14:19:10 2013 @@ -1,5 +1,5 @@ #!/bin/sh -# +echo# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -26,8 +26,8 @@ REPEAT="--repeat 10" QUEUES="-q 6" SENDERS="-s 3" RECEIVERS="-r 3" -BROKERS= # Local broker -CLIENT_HOSTS= # No ssh, all clients are local +BROKERS= # Local broker +CLIENT_HOSTS= # No ssh, all clients are local # Connection options TCP_NODELAY=false RECONNECT=true --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org