qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1493771 [1/2] - in /qpid/trunk/qpid/cpp: include/qpid/types/ src/ src/qpid/broker/ src/qpid/ha/ src/qpid/types/ src/tests/
Date Mon, 17 Jun 2013 14:19:12 GMT
Author: aconway
Date: Mon Jun 17 14:19:10 2013
New Revision: 1493771

URL: http://svn.apache.org/r1493771
Log:
QPID-4348: HA Use independent sequence numbers for identifying messages

Previously HA code used queue sequence numbers to identify messasges.
This assumes that message sequence is identical on primary and backup.

Implementing new features (for example transactions) requires that we tolerate
ordering differences between primary and backups.

This patch introduces a new, queue-scoped HA sequence number managed by the HA
plugin.  The HA ID is set *before* the message is enqueued and assigned a queue
sequence number. This means it is possible to identify messages before they are
enqueued, e.g. messages in an open transaction.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h
      - copied, changed from r1493713, qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/README.md   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/hash.h
      - copied, changed from r1493713, qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h
Modified:
    qpid/trunk/qpid/cpp/include/qpid/types/Uuid.h
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/ha.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageInterceptor.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h
    qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/types.h
    qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp
    qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark

Modified: qpid/trunk/qpid/cpp/include/qpid/types/Uuid.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/types/Uuid.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/types/Uuid.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/types/Uuid.h Mon Jun 17 14:19:10 2013
@@ -57,7 +57,7 @@ class QPID_TYPES_CLASS_EXTERN Uuid
     /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
     QPID_TYPES_EXTERN std::string str() const;
 
-    QPID_TYPES_EXTERN size_t size() const;
+ QPID_TYPES_EXTERN size_t size() const;
     QPID_TYPES_EXTERN const unsigned char* data() const;
 
     friend QPID_TYPES_EXTERN bool operator==(const Uuid&, const Uuid&);
@@ -69,6 +69,14 @@ class QPID_TYPES_CLASS_EXTERN Uuid
     friend QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream&, Uuid);
     friend QPID_TYPES_EXTERN std::istream& operator>>(std::istream&, Uuid&);
 
+    /** Hash value suitable for use with unordered_map */
+    size_t hash() const;
+
+    /** Hasher for use with unordered_map */
+    struct Hasher {
+        size_t operator()(const Uuid& u) const { return u.hash(); }
+    };
+
   private:
     unsigned char bytes[16];
 };
@@ -91,4 +99,5 @@ QPID_TYPES_EXTERN std::istream& operator
 
 }} // namespace qpid::types
 
+
 #endif  /*!QPID_TYPES_UUID_H*/

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Jun 17 14:19:10 2013
@@ -628,16 +628,14 @@ set (ha_default ON)
 option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
 if (BUILD_HA)
     set (ha_SOURCES
+	qpid/ha/QueueSnapshot.h
+	qpid/ha/QueueSnapshots.h
         qpid/ha/AlternateExchangeSetter.h
-	qpid/ha/BackupConnectionExcluder.h
-	qpid/ha/BrokerInfo.cpp
-	qpid/ha/BrokerInfo.h
-	qpid/ha/QueueGuard.cpp
-	qpid/ha/QueueGuard.h
-	qpid/ha/ReplicationTest.cpp
-	qpid/ha/ReplicationTest.h
         qpid/ha/Backup.cpp
         qpid/ha/Backup.h
+        qpid/ha/BackupConnectionExcluder.h
+        qpid/ha/BrokerInfo.cpp
+        qpid/ha/BrokerInfo.h
         qpid/ha/BrokerReplicator.cpp
         qpid/ha/BrokerReplicator.h
         qpid/ha/ConnectionObserver.cpp
@@ -647,26 +645,32 @@ if (BUILD_HA)
         qpid/ha/HaBroker.cpp
         qpid/ha/HaBroker.h
         qpid/ha/HaPlugin.cpp
-	qpid/ha/makeMessage.cpp
-	qpid/ha/makeMessage.h
+        qpid/ha/hash.h
+	qpid/ha/IdSetter.h
+        qpid/ha/QueueSnapshot.h
+        qpid/ha/makeMessage.cpp
+        qpid/ha/makeMessage.h
         qpid/ha/Membership.cpp
         qpid/ha/Membership.h
         qpid/ha/Primary.cpp
         qpid/ha/Primary.h
-        qpid/ha/QueueRange.h
+        qpid/ha/QueueGuard.cpp
+        qpid/ha/QueueGuard.h
         qpid/ha/QueueReplicator.cpp
         qpid/ha/QueueReplicator.h
+        qpid/ha/RemoteBackup.cpp
+        qpid/ha/RemoteBackup.h
         qpid/ha/ReplicatingSubscription.cpp
         qpid/ha/ReplicatingSubscription.h
+        qpid/ha/ReplicationTest.cpp
+        qpid/ha/ReplicationTest.h
+        qpid/ha/Role.h
         qpid/ha/Settings.h
-	qpid/ha/StatusCheck.cpp
-	qpid/ha/StatusCheck.h
+        qpid/ha/StandAlone.h
+        qpid/ha/StatusCheck.cpp
+        qpid/ha/StatusCheck.h
         qpid/ha/types.cpp
         qpid/ha/types.h
-        qpid/ha/RemoteBackup.cpp
-        qpid/ha/RemoteBackup.h
-	qpid/ha/Role.h
-	qpid/ha/StandAlone.h
     )
 
     add_library (ha MODULE ${ha_SOURCES})

Modified: qpid/trunk/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ha.mk?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ha.mk (original)
+++ qpid/trunk/qpid/cpp/src/ha.mk Mon Jun 17 14:19:10 2013
@@ -33,11 +33,14 @@ ha_la_SOURCES =					\
   qpid/ha/BrokerReplicator.h			\
   qpid/ha/ConnectionObserver.cpp		\
   qpid/ha/ConnectionObserver.h			\
-  qpid/ha/FailoverExchange.cpp				\
-  qpid/ha/FailoverExchange.h				\
+  qpid/ha/FailoverExchange.cpp			\
+  qpid/ha/FailoverExchange.h			\
   qpid/ha/HaBroker.cpp				\
   qpid/ha/HaBroker.h				\
   qpid/ha/HaPlugin.cpp				\
+  qpid/ha/hash.h				\
+  qpid/ha/IdSetter.h				\
+  qpid/ha/QueueSnapshot.h			\
   qpid/ha/makeMessage.cpp			\
   qpid/ha/makeMessage.h				\
   qpid/ha/Membership.cpp			\
@@ -46,20 +49,21 @@ ha_la_SOURCES =					\
   qpid/ha/Primary.h				\
   qpid/ha/QueueGuard.cpp			\
   qpid/ha/QueueGuard.h				\
-  qpid/ha/QueueRange.h				\
   qpid/ha/QueueReplicator.cpp			\
   qpid/ha/QueueReplicator.h			\
+  qpid/ha/QueueSnapshot.h			\
+  qpid/ha/QueueSnapshots.h			\
+  qpid/ha/RemoteBackup.cpp			\
+  qpid/ha/RemoteBackup.h			\
   qpid/ha/ReplicatingSubscription.cpp		\
   qpid/ha/ReplicatingSubscription.h		\
   qpid/ha/ReplicationTest.cpp			\
   qpid/ha/ReplicationTest.h			\
+  qpid/ha/Role.h				\
   qpid/ha/Settings.h				\
+  qpid/ha/StandAlone.h				\
   qpid/ha/StatusCheck.cpp			\
   qpid/ha/StatusCheck.h				\
-  qpid/ha/RemoteBackup.cpp			\
-  qpid/ha/RemoteBackup.h			\
-  qpid/ha/Role.h				\
-  qpid/ha/StandAlone.h				\
   qpid/ha/types.cpp				\
   qpid/ha/types.h
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Jun 17 14:19:10 2013
@@ -35,6 +35,7 @@ using std::string;
 
 DeliveryRecord::DeliveryRecord(const QueueCursor& _msg,
                                framing::SequenceNumber _msgId,
+                               framing::SequenceNumber _replicationId,
                                const Queue::shared_ptr& _queue,
                                const std::string& _tag,
                                const boost::shared_ptr<Consumer>& _consumer,
@@ -52,7 +53,8 @@ DeliveryRecord::DeliveryRecord(const Que
                                                    ended(accepted && acquired),
                                                    windowing(_windowing),
                                                    credit(_credit),
-                                                   msgId(_msgId)
+                                                   msgId(_msgId),
+                                                   replicationId(_replicationId)
 {}
 
 bool DeliveryRecord::setEnded()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Jun 17 14:19:10 2013
@@ -68,9 +68,12 @@ class DeliveryRecord
      */
     uint32_t credit;
     framing::SequenceNumber msgId;
+    framing::SequenceNumber replicationId;
 
   public:
-    QPID_BROKER_EXTERN DeliveryRecord(const QueueCursor& msgCursor, framing::SequenceNumber msgId,
+    QPID_BROKER_EXTERN DeliveryRecord(const QueueCursor& msgCursor,
+                                      framing::SequenceNumber msgId,
+                                      framing::SequenceNumber replicationId,
                                       const boost::shared_ptr<Queue>& queue, 
                                       const std::string& tag,
                                       const boost::shared_ptr<Consumer>& consumer,
@@ -111,6 +114,7 @@ class DeliveryRecord
     const QueueCursor& getMessage() const { return msg; }
     framing::SequenceNumber getId() const { return id; }
     framing::SequenceNumber getMessageId() const { return msgId; }
+    framing::SequenceNumber getReplicationId() const { return replicationId; }
     boost::shared_ptr<Queue> getQueue() const { return queue; }
 
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp Mon Jun 17 14:19:10 2013
@@ -38,6 +38,7 @@ void Lvq::push(Message& message, bool is
     {
         qpid::sys::Mutex::ScopedLock locker(messageLock);
         message.setSequence(++sequence);
+        interceptors.publish(message);
         removed = messageMap.update(message, old);
         listeners.populate(copy);
         observeEnqueue(message, locker);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Jun 17 14:19:10 2013
@@ -42,12 +42,18 @@ using std::string;
 namespace qpid {
 namespace broker {
 
-Message::Message() : deliveryCount(-1), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) {}
+Message::Message() : deliveryCount(-1), publisher(0), expiration(FAR_FUTURE), timestamp(0),
+                     isManagementMessage(false), replicationId(0)
+{}
+
 Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
-    : encoding(e), persistentContext(p), deliveryCount(-1), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false)
+    : encoding(e), persistentContext(p), deliveryCount(-1), publisher(0),
+      expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false),
+      replicationId(0)
 {
     if (persistentContext) persistentContext->setIngressCompletion(e);
 }
+
 Message::~Message() {}
 
 
@@ -308,4 +314,9 @@ void Message::processProperties(MapHandl
     encoding->processProperties(handler);
 }
 
+uint64_t Message::getReplicationId() const { return replicationId; }
+
+void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
+
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Jun 17 14:19:10 2013
@@ -131,6 +131,9 @@ public:
 
     QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const;
     QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
+    QPID_BROKER_EXTERN uint64_t getReplicationId() const;
+    QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
+
   private:
     boost::intrusive_ptr<Encoding> encoding;
     boost::intrusive_ptr<PersistableMessage> persistentContext;
@@ -143,6 +146,7 @@ public:
     bool isManagementMessage;
     MessageState state;
     qpid::framing::SequenceNumber sequence;
+    framing::SequenceNumber replicationId;
 
     void annotationsChanged();
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageInterceptor.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageInterceptor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageInterceptor.h Mon Jun 17 14:19:10 2013
@@ -37,12 +37,17 @@ class MessageInterceptor
   public:
     virtual ~MessageInterceptor() {}
 
+    /** Modify a message before it is recorded in durable store */
+    virtual void record(Message&) {}
     /** Modify a message as it is being published onto the queue. */
-    virtual void publish(Message&) = 0;
+    virtual void publish(Message&) {}
 };
 
 class MessageInterceptors : public Observers<MessageInterceptor> {
   public:
+    void record(Message& m) {
+        each(boost::bind(&MessageInterceptor::record, _1, boost::ref(m)));
+    }
     void publish(Message& m) {
         each(boost::bind(&MessageInterceptor::publish, _1, boost::ref(m)));
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h Mon Jun 17 14:19:10 2013
@@ -48,12 +48,6 @@ class Observers
         observers.erase(i);
     }
 
-  protected:
-    typedef std::vector<boost::shared_ptr<Observer> > List;
-
-    sys::Mutex lock;
-    List observers;
-
     template <class F> void each(F f) {
         List copy;
         {
@@ -62,6 +56,12 @@ class Observers
         }
         std::for_each(copy.begin(), copy.end(), f);
     }
+
+  protected:
+    typedef std::vector<boost::shared_ptr<Observer> > List;
+
+    sys::Mutex lock;
+    List observers;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jun 17 14:19:10 2013
@@ -285,9 +285,9 @@ void Queue::deliverTo(Message msg, TxBuf
         } else {
             if (enqueue(0, msg)) {
                 push(msg);
-                QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+                QPID_LOG(debug, "Message " << msg.getSequence() << " enqueued on " << name);
             } else {
-                QPID_LOG(debug, "Message " << msg << " dropped from " << name);
+                QPID_LOG(debug, "Message " << msg.getSequence() << " dropped from " << name);
             }
         }
     }
@@ -415,7 +415,8 @@ bool Queue::getNextMessage(Message& m, C
             if (c->filter(*msg)) {
                 if (c->accept(*msg)) {
                     if (c->preAcquires()) {
-                        QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState());
+                        QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence()
+                                 << " from '" << name << "' with state " << msg->getState());
                         if (allocator->acquire(c->getName(), *msg)) {
                             if (mgmtObject) {
                                 mgmtObject->inc_acquires();
@@ -825,6 +826,7 @@ void Queue::setLastNodeFailure()
  */
 bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
 {
+    interceptors.record(msg);
     ScopedUse u(barrier);
     if (!u.acquired) return false;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h Mon Jun 17 14:19:10 2013
@@ -70,7 +70,6 @@ class QueueObserver
     virtual void consumerAdded( const Consumer& ) {};
     virtual void consumerRemoved( const Consumer& ) {};
     virtual void destroy() {};
- private:
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Jun 17 14:19:10 2013
@@ -360,7 +360,7 @@ bool SemanticStateConsumerImpl::deliver(
 {
     allocateCredit(msg);
     boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
-    DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+    DeliveryRecord record(cursor, msg.getSequence(), msg.getReplicationId(), queue, getTag(),
                           consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit());
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Mon Jun 17 14:19:10 2013
@@ -55,13 +55,7 @@ Backup::Backup(HaBroker& hb, const Setti
     statusCheck(
         new StatusCheck(
             logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
-{
-    // Set link properties to tag outgoing links.
-    framing::FieldTable linkProperties = broker.getLinkClientProperties();
-    linkProperties.setTable(
-        ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
-    broker.setLinkClientProperties(linkProperties);
-}
+{}
 
 void Backup::setBrokerUrl(const Url& brokers) {
     if (brokers.empty()) return;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Mon Jun 17 14:19:10 2013
@@ -92,7 +92,7 @@ void BrokerInfo::assign(const Variant::M
 }
 
 std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
-    o  << b.getSystemId().str().substr(0,7);
+    o  << b.getSystemId().str().substr(0,8);
     if (b.getAddress() != empty) o << "@" << b.getAddress();
     o << "(" << printable(b.getStatus()) << ")";
     return o;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Mon Jun 17 14:19:10 2013
@@ -27,6 +27,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/types/Uuid.h"
 #include "qpid/types/Variant.h"
+#include "qpid/sys/unordered_map.h"
 #include <string>
 #include <iosfwd>
 #include <vector>
@@ -41,7 +42,7 @@ class BrokerInfo
 {
   public:
     typedef std::set<BrokerInfo> Set;
-    typedef std::map<types::Uuid, BrokerInfo> Map;
+    typedef qpid::sys::unordered_map<types::Uuid, BrokerInfo, types::Uuid::Hasher> Map;
 
     BrokerInfo();
     BrokerInfo(const types::Uuid& id, BrokerStatus, const Address& = Address());
@@ -50,9 +51,9 @@ class BrokerInfo
 
     types::Uuid getSystemId() const { return systemId; }
     BrokerStatus getStatus() const { return status; }
-    Address getAddress() const { return address; }
-
     void setStatus(BrokerStatus s)  { status = s; }
+    Address getAddress() const { return address; }
+    void setAddress(const Address& a) { address = a; }
 
     framing::FieldTable asFieldTable() const;
     types::Variant::Map asMap() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Jun 17 14:19:10 2013
@@ -124,6 +124,8 @@ const string BROKER("broker");
 const string MEMBERS("members");
 const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
 
+const string COLON(":");
+
 void sendQuery(const string& packageName, const string& className, const string& queueName,
                SessionHandler& sessionHandler)
 {
@@ -282,6 +284,14 @@ class BrokerReplicator::UpdateTracker {
     ReplicationTest repTest;
 };
 
+namespace {
+template <class EventType> std::string key() {
+    pair<string,string> name = EventType::getFullName();
+    return name.first + COLON + name.second;
+}
+}
+
+
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
       logPrefix("Backup: "), replicationTest(NONE),
@@ -298,14 +308,14 @@ BrokerReplicator::BrokerReplicator(HaBro
     args.setString(QPID_REPLICATE, printable(NONE).str());
     setArgs(args);
 
-    dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
-    dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
-    dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare;
-    dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete;
-    dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind;
-    dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind;
-    dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate;
-    dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe;
+    dispatch[key<EventQueueDeclare>()] = &BrokerReplicator::doEventQueueDeclare;
+    dispatch[key<EventQueueDelete>()] = &BrokerReplicator::doEventQueueDelete;
+    dispatch[key<EventExchangeDeclare>()] = &BrokerReplicator::doEventExchangeDeclare;
+    dispatch[key<EventExchangeDelete>()] = &BrokerReplicator::doEventExchangeDelete;
+    dispatch[key<EventBind>()] = &BrokerReplicator::doEventBind;
+    dispatch[key<EventUnbind>()] = &BrokerReplicator::doEventUnbind;
+    dispatch[key<EventMembersUpdate>()] = &BrokerReplicator::doEventMembersUpdate;
+    dispatch[key<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe;
 }
 
 void BrokerReplicator::initialize() {
@@ -402,7 +412,7 @@ void BrokerReplicator::connected(Bridge&
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
     //subscribe to the queue
     FieldTable arguments;
-    arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+    arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
     peer.getMessage().subscribe(
         queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
         false/*exclusive*/, "", 0, arguments);
@@ -439,7 +449,9 @@ void BrokerReplicator::route(Deliverable
                 QPID_LOG(trace, "Broker replicator event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+                std::string key = (schema[PACKAGE_NAME].asString() +
+                                   COLON +
+                                   schema[CLASS_NAME].asString());
                 EventDispatchMap::iterator j = dispatch.find(key);
                 if (j != dispatch.end()) (this->*(j->second))(values);
             }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Mon Jun 17 14:19:10 2013
@@ -29,6 +29,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/types/Variant.h"
 #include "qpid/management/ManagementObject.h"
+#include "qpid/sys/unordered_map.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
 #include <set>
@@ -88,11 +89,10 @@ class BrokerReplicator : public broker::
     typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
     typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
 
-    typedef std::pair<std::string,std::string> EventKey;
     typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
-    typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+    typedef qpid::sys::unordered_map<std::string, DispatchFunction> EventDispatchMap;
 
-    typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+    typedef qpid::sys::unordered_map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
 
     class UpdateTracker;
     class ErrorListener;
@@ -152,7 +152,6 @@ class BrokerReplicator : public broker::
     bool initialized;
     AlternateExchangeSetter alternates;
     qpid::Address primary;
-    typedef std::set<std::string> StringSet;
     broker::Connection* connection;
     EventDispatchMap dispatch;
     std::auto_ptr<UpdateTracker> queueTracker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Mon Jun 17 14:19:10 2013
@@ -65,6 +65,11 @@ ConnectionObserver::ObserverPtr Connecti
     return observer;
 }
 
+void ConnectionObserver::reset() {
+    sys::Mutex::ScopedLock l(lock);
+    observer.reset();
+}
+
 bool ConnectionObserver::isSelf(const broker::Connection& connection) {
     BrokerInfo info;
     return getBrokerInfo(connection, info) && info.getSystemId() == self;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h Mon Jun 17 14:19:10 2013
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker
     void setObserver(const ObserverPtr&, const std::string& logPrefix);
     ObserverPtr getObserver();
 
+    void reset();
+
     void opened(broker::Connection& connection);
     void closed(broker::Connection& connection);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Jun 17 14:19:10 2013
@@ -27,7 +27,10 @@
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
 #include "StandAlone.h"
+#include "QueueSnapshot.h"
+#include "QueueSnapshots.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/assert.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -65,7 +68,8 @@ HaBroker::HaBroker(broker::Broker& b, co
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),
       membership(BrokerInfo(systemId, STANDALONE), *this),
-      failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
+      failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)),
+      queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots))
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
@@ -77,6 +81,8 @@ HaBroker::HaBroker(broker::Broker& b, co
         broker.getConnectionObservers().add(observer);
         broker.getExchanges().registerExchange(failoverExchange);
     }
+    // QueueSnapshots are needed for standalone replication as well as cluster.
+    broker.getConfigurationObservers().add(queueSnapshots);
 }
 
 namespace {
@@ -86,8 +92,10 @@ bool isNone(const std::string& x) { retu
 
 // Called in Plugin::initialize
 void HaBroker::initialize() {
-    if (settings.cluster) membership.setStatus(JOINING);
-    QPID_LOG(notice, "Initializing: " << membership.getInfo());
+    if (settings.cluster) {
+        membership.setStatus(JOINING);
+        QPID_LOG(notice, "Initializing HA broker: " << membership.getInfo());
+    }
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
@@ -103,7 +111,7 @@ void HaBroker::initialize() {
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         shared_ptr<ReplicatingSubscription::Factory>(
-            new ReplicatingSubscription::Factory()));
+            new ReplicatingSubscription::Factory(*this)));
 
     // If we are in a cluster, start as backup in joining state.
     if (settings.cluster) {
@@ -205,9 +213,12 @@ BrokerStatus HaBroker::getStatus() const
 
 void HaBroker::setAddress(const Address& a) {
     QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
-    BrokerInfo b(membership.getSelf(), membership.getStatus(), a);
-    membership.add(b);
+    membership.setAddress(a);
 }
 
+boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::string& queueName) {
+    return boost::dynamic_pointer_cast<QueueReplicator>(
+        broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
+}
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Jun 17 14:19:10 2013
@@ -54,6 +54,10 @@ class Backup;
 class ConnectionObserver;
 class Primary;
 class Role;
+class QueueSnapshot;
+class QueueSnapshots;
+class QueueReplicator;
+
 /**
  * HA state and actions associated with a HA broker. Holds all the management info.
  *
@@ -93,8 +97,11 @@ class HaBroker : public management::Mana
 
     void setAddress(const Address&); // set self address from a self-connection
 
-  private:
+    boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; }
 
+    boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
+
+  private:
     void setPublicUrl(const Url&);
     void setBrokerUrl(const Url&);
     void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -117,6 +124,7 @@ class HaBroker : public management::Mana
     boost::shared_ptr<Role> role;
     Membership membership;
     boost::shared_ptr<FailoverExchange> failoverExchange;
+    boost::shared_ptr<QueueSnapshots> queueSnapshots;
 };
 }} // namespace qpid::ha
 

Added: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h?rev=1493771&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h Mon Jun 17 14:19:10 2013
@@ -0,0 +1,62 @@
+#ifndef QPID_HA_IDSETTER_H
+#define QPID_HA_IDSETTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageInterceptor.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicValue.h"
+
+
+namespace qpid {
+namespace ha {
+
+/**
+ * A MessageInterceptor that sets the ReplicationId on each message as it is
+ * enqueued on a primary queue.
+ *
+ * THREAD UNSAFE: Called sequentially under the queue lock.
+ */
+class IdSetter : public broker::MessageInterceptor
+{
+  public:
+    IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
+        QPID_LOG(trace, "Initial replication ID for " << name << " is " << nextId.get());
+    }
+
+    void record(broker::Message& m) {
+        m.setReplicationId(nextId++);
+        QPID_LOG(trace, "Recorded replication ID " << m.getReplicationId() << " on " << name);
+    }
+
+  private:
+    sys::AtomicValue<uint32_t> nextId;
+    std::string name;
+};
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_IDSETTER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Mon Jun 17 14:19:10 2013
@@ -18,9 +18,11 @@
  * under the License.
  *
  */
-#include "Membership.h"
+#include "ConnectionObserver.h"
 #include "HaBroker.h"
+#include "Membership.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
@@ -109,11 +111,25 @@ bool Membership::get(const types::Uuid& 
 
 void Membership::update(Mutex::ScopedLock& l) {
     QPID_LOG(info, "Membership: " <<  brokers);
-    Variant::List brokers = asList();
+    // Update managment and send update event.
+    Variant::List brokerList = asList();
     if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
-    if (mgmtObject) mgmtObject->set_members(brokers);
+    if (mgmtObject) mgmtObject->set_members(brokerList);
     haBroker.getBroker().getManagementAgent()->raiseEvent(
-        _qmf::EventMembersUpdate(brokers));
+        _qmf::EventMembersUpdate(brokerList));
+
+    // Update link client properties
+    framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties();
+    if (isBackup(getStatus(l))) {
+        // Set backup tag on outgoing link properties.
+        linkProperties.setTable(
+            ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable());
+        haBroker.getBroker().setLinkClientProperties(linkProperties);
+    } else {
+        // Remove backup tag property from outgoing link properties.
+        linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+        haBroker.getBroker().setLinkClientProperties(linkProperties);
+    }
 }
 
 void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
@@ -178,5 +194,10 @@ BrokerInfo Membership::getInfo() const  
     return i->second;
 }
 
-// FIXME aconway 2013-01-23: move to .h?
+void Membership::setAddress(const Address& a) {
+    Mutex::ScopedLock l(lock);
+    brokers[self].setAddress(a);
+    update(l);
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Mon Jun 17 14:19:10 2013
@@ -78,6 +78,7 @@ class Membership
     BrokerInfo getInfo() const;
     BrokerStatus getStatus() const;
     void setStatus(BrokerStatus s);
+    void setAddress(const Address&);
 
   private:
     void update(sys::Mutex::ScopedLock&);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Mon Jun 17 14:19:10 2013
@@ -22,9 +22,11 @@
 #include "HaBroker.h"
 #include "Primary.h"
 #include "ReplicationTest.h"
+#include "IdSetter.h"
 #include "ReplicatingSubscription.h"
 #include "RemoteBackup.h"
 #include "ConnectionObserver.h"
+#include "QueueReplicator.h"
 #include "qpid/assert.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/ConfigurationObserver.h"
@@ -41,6 +43,7 @@ namespace qpid {
 namespace ha {
 
 using sys::Mutex;
+using boost::shared_ptr;
 using namespace std;
 using namespace framing;
 
@@ -87,6 +90,8 @@ Primary::Primary(HaBroker& hb, const Bro
     replicationTest(hb.getSettings().replicateDefault.get())
 {
     hb.getMembership().setStatus(RECOVERING);
+    broker::QueueRegistry& queues = hb.getBroker().getQueues();
+    queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
     assert(instance == 0);
     instance = this;            // Let queue replicators find us.
     if (expect.empty()) {
@@ -101,25 +106,16 @@ Primary::Primary(HaBroker& hb, const Bro
             boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
-            backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
+            setCatchupQueues(backup, true); // Create guards
         }
         // Set timeout for expected brokers to connect and become ready.
         sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout);
         timerTask = new ExpectedBackupTimerTask(*this, deadline);
         hb.getBroker().getTimer().add(timerTask);
     }
-
-
-    // Remove backup tag property from outgoing link properties.
-    framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
-    linkProperties.erase(ConnectionObserver::BACKUP_TAG);
-    hb.getBroker().setLinkClientProperties(linkProperties);
-
     configurationObserver.reset(new PrimaryConfigurationObserver(*this));
     haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
-
-    Mutex::ScopedLock l(lock);  // We are now active as a configurationObserver
-    checkReady(l);
+    checkReady();               // Outside lock
 
     // Allow client connections
     connectionObserver.reset(new PrimaryConnectionObserver(*this));
@@ -129,29 +125,48 @@ Primary::Primary(HaBroker& hb, const Bro
 Primary::~Primary() {
     if (timerTask) timerTask->cancel();
     haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
+    haBroker.getObserver()->reset();
+}
+
+void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
+    if (replicationTest.useLevel(*q) == ALL) {
+        boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
+        ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
+        q->getMessageInterceptors().add(
+            boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
+    }
 }
 
-void Primary::checkReady(Mutex::ScopedLock&) {
-    if (!active && expectedBackups.empty()) {
-        active = true;
-        Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
+void Primary::checkReady() {
+    bool activate = false;
+    {
+        Mutex::ScopedLock l(lock);
+        if (!active && expectedBackups.empty())
+            activate = active = true;
+    }
+    if (activate) {
         QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
-        membership.setStatus(ACTIVE);
+        membership.setStatus(ACTIVE); // Outside of lock.
     }
 }
 
-void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l)  {
-    if (i != backups.end() && i->second->reportReady()) {
-        BrokerInfo info = i->second->getBrokerInfo();
-        info.setStatus(READY);
-        membership.add(info);
-        if (expectedBackups.erase(i->second)) {
-            QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
-            checkReady(l);
-        }
+void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
+    bool ready = false;
+    {
+        Mutex::ScopedLock l(lock);
+        if (backup->reportReady()) {
+            BrokerInfo info = backup->getBrokerInfo();
+            info.setStatus(READY);
+            membership.add(info);
+            if (expectedBackups.erase(backup)) {
+                QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+                ready = true;
+            }
         else
             QPID_LOG(info, logPrefix << "New backup is ready: " << info);
+        }
     }
+    if (ready) checkReady(); // Outside lock
 }
 
 void Primary::timeoutExpectedBackups() {
@@ -162,35 +177,39 @@ void Primary::timeoutExpectedBackups() {
         // Allow backups that are connected to continue becoming ready.
         for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
         {
-            boost::shared_ptr<RemoteBackup> rb = *i;
-            if (!rb->isConnected()) {
-                BrokerInfo info = rb->getBrokerInfo();
+            // This loop erases elements of backups in backupDisconnect, so
+            // save and increment the iterator.
+            BackupSet::iterator j = i++;
+            boost::shared_ptr<RemoteBackup> backup = *j;
+            if (!backup->getConnection()) {
+                BrokerInfo info = backup->getBrokerInfo();
                 QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
-                expectedBackups.erase(i++);
-                backups.erase(info.getSystemId());
-                rb->cancel();
-                // Downgrade the broker's status to CATCHUP
+                backupDisconnect(backup, l); // Calls erase(j)
+                // Keep broker in membership but downgrade status to CATCHUP.
                 // The broker will get this status change when it eventually connects.
                 info.setStatus(CATCHUP);
                 membership.add(info);
             }
-            else ++i;
         }
-        checkReady(l);
     }
     catch(const std::exception& e) {
         QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what());
         // No-where for this exception to go.
     }
+    checkReady();
 }
 
 void Primary::readyReplica(const ReplicatingSubscription& rs) {
-    sys::Mutex::ScopedLock l(lock);
-    BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
-    if (i != backups.end()) {
-        i->second->ready(rs.getQueue());
-        checkReady(i, l);
+    shared_ptr<RemoteBackup> backup;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
+        if (i != backups.end()) {
+            backup = i->second;
+            backup->ready(rs.getQueue());
+        }
     }
+    if (backup) checkReady(backup);
 }
 
 // NOTE: Called with queue registry lock held.
@@ -201,23 +220,28 @@ void Primary::queueCreate(const QueuePtr
              << " replication: " << printable(level));
     q->addArgument(QPID_REPLICATE, printable(level).str());
     if (level) {
-        // Give each queue a unique id to avoid confusion of same-named queues.
+        initializeQueue(q);
+        // Give each queue a unique id. Used by backups to avoid confusion of
+        // same-named queues.
         q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
-        Mutex::ScopedLock l(lock);
-        for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
-            i->second->queueCreate(q);
-            checkReady(i, l);
+        {
+            Mutex::ScopedLock l(lock);
+            for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+                i->second->queueCreate(q);
         }
+        checkReady();           // Outside lock
     }
 }
 
 // NOTE: Called with queue registry lock held.
 void Primary::queueDestroy(const QueuePtr& q) {
-        QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
-    Mutex::ScopedLock l(lock);
-    for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
-        i->second->queueDestroy(q);
-    checkReady(l);
+    QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
+    {
+        Mutex::ScopedLock l(lock);
+        for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+            i->second->queueDestroy(q);
+    }
+    checkReady();               // Outside lock
 }
 
 // NOTE: Called with exchange registry lock held.
@@ -240,56 +264,88 @@ void Primary::exchangeDestroy(const Exch
     // Do nothing
  }
 
+// New backup connected
+shared_ptr<RemoteBackup> Primary::backupConnect(
+    const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
+{
+    shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+    backups[info.getSystemId()] = backup;
+    return backup;
+}
+
+// Remove a backup. Caller should not release the shared pointer returend till
+// outside the lock.
+void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) {
+    types::Uuid id = backup->getBrokerInfo().getSystemId();
+    backup->cancel();
+    expectedBackups.erase(backup);
+    backups.erase(id);
+}
+
+
 void Primary::opened(broker::Connection& connection) {
     BrokerInfo info;
+    shared_ptr<RemoteBackup> backup;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
         Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
-            QPID_LOG(info, logPrefix << "New backup connected: " << info);
-            boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
-            {
-                // Avoid deadlock with queue registry lock.
-                Mutex::ScopedUnlock u(lock);
-                backup->setCatchupQueues(haBroker.getBroker().getQueues(), false);
-            }
-            backups[info.getSystemId()] = backup;
-            i = backups.find(info.getSystemId());
+            QPID_LOG(info, logPrefix << "New backup connection: " << info);
+            backup = backupConnect(info, connection, l);
         }
-        else {
-            QPID_LOG(info, logPrefix << "Known backup connected: " << info);
+        else if (i->second->getConnection()) {
+            // The backup is failing over before we recieved the closed() call
+            // for its previous connection. Remove the old entry and create a new one.
+            QPID_LOG(error, logPrefix << "Known backup reconnect before disconnection: " << info);
+            backupDisconnect(i->second, l);
+            backup = backupConnect(info, connection, l);
+        } else {
+            QPID_LOG(info, logPrefix << "Known backup reconnection: " << info);
             i->second->setConnection(&connection);
         }
         if (info.getStatus() == JOINING) {
             info.setStatus(CATCHUP);
             membership.add(info);
         }
-        if (i != backups.end()) checkReady(i, l);
     }
     else
-        QPID_LOG(debug, logPrefix << "Accepted client connection "
-                 << connection.getMgmtId());
+        QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId());
+
+    // Outside lock
+    if (backup) {
+        setCatchupQueues(backup, false);
+        checkReady(backup);
+    }
+    checkReady();
 }
 
 void Primary::closed(broker::Connection& connection) {
     BrokerInfo info;
+    shared_ptr<RemoteBackup> backup;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
         Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
         // NOTE: It is possible for a backup connection to be rejected while we
         // are a backup, but closed() is called after we have become primary.
         // Checking  isConnected() lets us ignore such spurious closes.
-        if (i != backups.end() && i->second->isConnected()) {
-            QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
-            membership.remove(info.getSystemId());
-            expectedBackups.erase(i->second);
-            backups.erase(i);
-            checkReady(l);
+        if (i == backups.end()) {
+            QPID_LOG(info, "Disconnect from unknown backup " << info);
+        }
+        else if (i->second->getConnection() != &connection) {
+            QPID_LOG(info, logPrefix << "Late disconnect from backup " << info);
+        }
+        else {
+            QPID_LOG(info, logPrefix << "Disconnect from "
+                     << (i->second->getConnection() ? "" : "disconnected ")
+                     << "backup " << info);
+            // Assign to shared_ptr so it will be deleted after we release the lock.
+            backup = i->second;
+            backupDisconnect(backup, l);
         }
     }
+    checkReady();
 }
 
-
 boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
 {
     Mutex::ScopedLock l(lock);
@@ -302,4 +358,11 @@ Role* Primary::promote() {
     return 0;
 }
 
+void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) {
+    // Do queue iteration outside the lock to avoid deadlocks with QueueRegistry.
+    haBroker.getBroker().getQueues().eachQueue(
+        boost::bind(&RemoteBackup::catchupQueue, backup, _1, createGuards));
+    backup->startCatchup();
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Mon Jun 17 14:19:10 2013
@@ -27,9 +27,9 @@
 #include "ReplicationTest.h"
 #include "Role.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/unordered_map.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
-#include <map>
 #include <string>
 
 namespace qpid {
@@ -64,6 +64,7 @@ class Primary : public Role
   public:
     typedef boost::shared_ptr<broker::Queue> QueuePtr;
     typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
+    typedef boost::shared_ptr<RemoteBackup> RemoteBackupPtr;
 
     static Primary* get() { return instance; }
 
@@ -94,11 +95,18 @@ class Primary : public Role
     void timeoutExpectedBackups();
 
   private:
-    typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap;
-    typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet;
+    typedef qpid::sys::unordered_map<
+      types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap;
 
-    void checkReady(sys::Mutex::ScopedLock&);
-    void checkReady(BackupMap::iterator, sys::Mutex::ScopedLock&);
+    typedef std::set<RemoteBackupPtr > BackupSet;
+
+    RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
+    void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
+
+    void initializeQueue(boost::shared_ptr<broker::Queue>);
+    void checkReady();
+    void checkReady(RemoteBackupPtr);
+    void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
 
     sys::Mutex lock;
     HaBroker& haBroker;
@@ -120,7 +128,6 @@ class Primary : public Role
     boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
     boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
-
     static Primary* instance;
 };
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Jun 17 14:19:10 2013
@@ -19,7 +19,7 @@
  *
  */
 #include "QueueGuard.h"
-#include "ReplicatingSubscription.h"
+#include "BrokerInfo.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/broker/QueueObserver.h"
@@ -32,8 +32,6 @@ namespace ha {
 
 using namespace broker;
 using sys::Mutex;
-using framing::SequenceNumber;
-using framing::SequenceSet;
 
 class QueueGuard::QueueObserver : public broker::QueueObserver
 {
@@ -50,15 +48,19 @@ class QueueGuard::QueueObserver : public
 
 
 QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
-    : cancelled(false), queue(q), subscription(0)
+    : cancelled(false), queue(q)
 {
     std::ostringstream os;
-    os << "Primary guard " << queue.getName() << "@" << info << ": ";
+    os << "Guard of " << queue.getName() << " at " << info << ": ";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
     queue.addObserver(observer);
-    // Set range after addObserver so we know that range.back+1 is a guarded position.
-    range = QueueRange(q);
+    // Set first after calling addObserver so we know that the back of the
+    // queue+1 is (or will be) a guarded position.
+    QueuePosition front, back;
+    q.getRange(front, back, broker::REPLICATOR);
+    first = back + 1;
+    QPID_LOG(debug, logPrefix << "First guarded position " << first);
 }
 
 QueueGuard::~QueueGuard() { cancel(); }
@@ -66,20 +68,20 @@ QueueGuard::~QueueGuard() { cancel(); }
 // NOTE: Called with message lock held.
 void QueueGuard::enqueued(const Message& m) {
     // Delay completion
-    QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+    ReplicationId id = m.getReplicationId();
+    QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m));
     Mutex::ScopedLock l(lock);
     if (cancelled) return;  // Don't record enqueues after we are cancelled.
-    assert(delayed.find(m.getSequence()) == delayed.end());
-    delayed[m.getSequence()] = m.getIngressCompletion();
+    delayed[id] = m.getIngressCompletion();
     m.getIngressCompletion()->startCompleter();
 }
 
 // NOTE: Called with message lock held.
 void QueueGuard::dequeued(const Message& m) {
-    QPID_LOG(trace, logPrefix << "Dequeued " << m);
+    ReplicationId id = m.getReplicationId();
+    QPID_LOG(trace, logPrefix << "Dequeued "  << LogMessageId(queue, m));
     Mutex::ScopedLock l(lock);
-    if (subscription) subscription->dequeued(m);
-    complete(m.getSequence(), l);
+    complete(id, l);
 }
 
 void QueueGuard::cancel() {
@@ -87,48 +89,31 @@ void QueueGuard::cancel() {
     Mutex::ScopedLock l(lock);
     if (cancelled) return;
     cancelled = true;
-    for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
-        complete(i, l);
-        delayed.erase(i++);
-    }
+    while (!delayed.empty()) complete(delayed.begin(), l);
 }
 
-void QueueGuard::attach(ReplicatingSubscription& rs) {
+bool QueueGuard::complete(ReplicationId id) {
     Mutex::ScopedLock l(lock);
-    subscription = &rs;
+    return complete(id, l);
 }
 
-bool QueueGuard::subscriptionStart(SequenceNumber position) {
-    // Complete any messages before or at the ReplicatingSubscription start position.
-    // Those messages are already on the backup.
-    Mutex::ScopedLock l(lock);
-    Delayed::iterator i = delayed.begin();
-    while(i != delayed.end() && i->first <= position) {
-        complete(i, l);
-        delayed.erase(i++);
-    }
-    return position >= range.back;
-}
-
-void QueueGuard::complete(SequenceNumber sequence) {
-    Mutex::ScopedLock l(lock);
-    complete(sequence, l);
-}
-
-void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+bool QueueGuard::complete(ReplicationId id, Mutex::ScopedLock& l) {
     // The same message can be completed twice, by
     // ReplicatingSubscription::acknowledged and dequeued. Remove it
     // from the map so we only call finishCompleter() once
-    Delayed::iterator i = delayed.find(sequence);
+    Delayed::iterator i = delayed.find(id);
     if (i != delayed.end()) {
         complete(i, l);
-        delayed.erase(i);
+        return true;
     }
+    return false;
 }
 
 void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
     QPID_LOG(trace, logPrefix << "Completed " << i->first);
     i->second->finishCompleter();
+    delayed.erase(i);
 }
 
+
 }} // namespaces qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Mon Jun 17 14:19:10 2013
@@ -23,12 +23,12 @@
  */
 
 #include "types.h"
-#include "QueueRange.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/SequenceSet.h"
+#include "hash.h"
 #include "qpid/types/Uuid.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/unordered_map.h"
 #include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <deque>
 #include <set>
 
@@ -37,6 +37,7 @@ namespace broker {
 class Queue;
 struct QueuedMessage;
 class Message;
+class AsyncCompletion;
 }
 
 namespace ha {
@@ -53,10 +54,8 @@ class ReplicatingSubscription;
  *
  * THREAD SAFE: Concurrent calls:
  *  - enqueued() via QueueObserver in arbitrary connection threads.
- *  - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ *  - cancel(), complete() from ReplicatingSubscription in subscription thread.
  *
- * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
- * QueueGuard MAY call ReplicatingSubscription with it's lock held.
  */
 class QueueGuard {
   public:
@@ -73,54 +72,35 @@ class QueueGuard {
      */
     void dequeued(const broker::Message&);
 
-    /** Complete a delayed message. */
-    void complete(framing::SequenceNumber);
+    /** Complete a delayed message.
+     *@return true if the ID was delayed
+     */
+    bool complete(ReplicationId);
 
     /** Complete all delayed messages. */
     void cancel();
 
-    void attach(ReplicatingSubscription&);
-
-    /**
-     * Return the un-guarded queue range at the time the QueueGuard was created.
-     *
-     * The first position guaranteed to be protected by the guard is
-     * getRange().getBack()+1. It is possible that the guard has protected some
-     * messages before that point. Any such messages are dealt with in subscriptionStart
-     *
-     * The QueueGuard is created in 3 situations
-     * - when a backup is promoted, guards are created for expected backups.
-     * - when a new queue is created on the primary
-     * - when a new backup joins.
-     *
-     * In the last situation the queue is active while the guard is being
-     * created.
-     *
-     */
-    const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
-
-    /** Inform the guard of the stating position for the attached subscription.
-     * Complete messages that will not be seen by the subscription.
-     *@return true if the subscription has already advanced to a guarded position.
+    /** Return the first known guarded position on the queue.  It is possible
+     * that the guard has seen a few messages before this point.
      */
-    bool subscriptionStart(framing::SequenceNumber position);
+    QueuePosition getFirst() const { return first; } // Thread safe: Immutable.
 
   private:
     class QueueObserver;
-    typedef std::map<framing::SequenceNumber,
-                     boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+    typedef qpid::sys::unordered_map<ReplicationId,
+                                     boost::intrusive_ptr<broker::AsyncCompletion>,
+                                     TrivialHasher<ReplicationId> > Delayed;
 
-    void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+    bool complete(ReplicationId, sys::Mutex::ScopedLock &);
     void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
 
     sys::Mutex lock;
+    QueuePosition first;
     bool cancelled;
     std::string logPrefix;
     broker::Queue& queue;
     Delayed delayed;
-    ReplicatingSubscription* subscription;
     boost::shared_ptr<QueueObserver> observer;
-    QueueRange range;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Jun 17 14:19:10 2013
@@ -19,8 +19,10 @@
  *
  */
 
+#include "makeMessage.h"
 #include "HaBroker.h"
 #include "QueueReplicator.h"
+#include "QueueSnapshots.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
 #include "qpid/broker/Bridge.h"
@@ -31,10 +33,10 @@
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/SessionHandler.h"
-#include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Msg.h"
+#include "qpid/assert.h"
 #include <boost/shared_ptr.hpp>
 
 namespace {
@@ -51,7 +53,7 @@ using namespace std;
 using sys::Mutex;
 
 const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position");
+const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
 const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
 std::string QueueReplicator::replicatorName(const std::string& queueName) {
@@ -107,10 +109,12 @@ QueueReplicator::QueueReplicator(HaBroke
                                  boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
       haBroker(hb),
-      logPrefix("Backup queue "+q->getName()+": "),
+      logPrefix("Backup of "+q->getName()+": "),
       queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
-      settings(hb.getSettings()), destroyed(false)
+      settings(hb.getSettings()), destroyed(false),
+      nextId(0), maxId(0)
 {
+    QPID_LOG(debug, logPrefix << "Created");
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -162,13 +166,12 @@ QueueReplicator::~QueueReplicator() {}
 
 // Called from Queue::destroyed()
 void QueueReplicator::destroy() {
-    QPID_LOG(debug, logPrefix << " destroyed");
     boost::shared_ptr<Bridge> bridge2; // To call outside of lock
     {
         Mutex::ScopedLock l(lock);
         if (destroyed) return;
         destroyed = true;
-        QPID_LOG(debug, logPrefix << "Destroyed.");
+        QPID_LOG(debug, logPrefix << "Destroyed");
         // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
         queue.reset();
         link.reset();
@@ -188,12 +191,10 @@ void QueueReplicator::initializeBridge(B
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     FieldTable arguments;
     arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
-    arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
-    arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
-    arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
-    SequenceNumber front, back;
-    queue->getRange(front, back, broker::REPLICATOR);
-    if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front);
+    arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
+    arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
+    arguments.setString(ReplicatingSubscription::QPID_ID_SET,
+                        encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
     try {
         peer.getMessage().subscribe(
             args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -222,51 +223,36 @@ template <class T> T decodeContent(Messa
 }
 }
 
-void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
-    if (destroyed) return;
-    queue->dequeueMessageAt(n);
-}
-
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result) {
-    result = message.getSequence();
-    return true;
-}
-bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) {
-    QueueCursor cursor(REPLICATOR);
-    return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
+void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
+    QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+    //TODO: should be able to optimise the following
+    for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+        PositionMap::iterator j = positions.find(*i);
+        if (j != positions.end()) queue->dequeueMessageAt(j->second);
+    }
 }
-} // namespace
 
 // Called in connection thread of the queues bridge to primary.
 void QueueReplicator::route(Deliverable& msg)
 {
     try {
-        const std::string& key = msg.getMessage().getRoutingKey();
         Mutex::ScopedLock l(lock);
         if (destroyed) return;
-        if (!isEventKey(key)) {
+        const std::string& key = msg.getMessage().getRoutingKey();
+        if (!isEventKey(key)) { // Replicated message
+            ReplicationId id = nextId++;
+            maxId = std::max(maxId, id);
+            msg.getMessage().setReplicationId(id);
             msg.deliverTo(queue);
-            // We are on a backup so the queue is not modified except via this.
-            QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+            QueuePosition position = queue->getPosition();
+            positions[id] = position;
+            QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
         }
         else if (key == DEQUEUE_EVENT_KEY) {
-            SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
-            QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
-            //TODO: should be able to optimise the following
-            for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
-                dequeue(*i, l);
+            dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
         }
-        else if (key == POSITION_EVENT_KEY) {
-            SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
-            QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
-                     << " to " << position);
-            // Verify that there are no messages after the new position in the queue.
-            SequenceNumber next;
-            if (getNext(*queue, position, next))
-                throw Exception(QPID_MSG(logPrefix << "Invalid position " << position
-                                         << " preceeds message at " << next));
-            queue->setPosition(position);
+        else if (key == ID_EVENT_KEY) {
+            nextId = decodeContent<ReplicationId>(msg.getMessage());
         }
         // Ignore unknown event keys, may be introduced in later versions.
     }
@@ -275,6 +261,11 @@ void QueueReplicator::route(Deliverable&
     }
 }
 
+ReplicationId QueueReplicator::getMaxId() {
+    Mutex::ScopedLock l(lock);
+    return maxId;
+}
+
 // Unused Exchange methods.
 bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
 bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Mon Jun 17 14:19:10 2013
@@ -23,8 +23,8 @@
  */
 
 #include "BrokerInfo.h"
+#include "hash.h"
 #include "qpid/broker/Exchange.h"
-#include "qpid/framing/SequenceSet.h"
 #include <boost/enable_shared_from_this.hpp>
 #include <iosfwd>
 
@@ -57,7 +57,7 @@ class QueueReplicator : public broker::E
 {
   public:
     static const std::string DEQUEUE_EVENT_KEY;
-    static const std::string POSITION_EVENT_KEY;
+    static const std::string ID_EVENT_KEY;
     static const std::string QPID_SYNC_FREQUENCY;
 
     static std::string replicatorName(const std::string& queueName);
@@ -87,13 +87,17 @@ class QueueReplicator : public broker::E
 
     boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
 
+    ReplicationId getMaxId();
+
   private:
+    typedef qpid::sys::unordered_map<ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
+
     class ErrorListener;
     class QueueObserver;
 
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
     void destroy();             // Called when the queue is destroyed.
-    void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+    void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&);
 
     HaBroker& haBroker;
     std::string logPrefix;
@@ -106,8 +110,13 @@ class QueueReplicator : public broker::E
     bool subscribed;
     const Settings& settings;
     bool destroyed;
+    PositionMap positions;
+    ReplicationIdSet idSet; // Set of replicationIds on the queue.
+    ReplicationId nextId;   // ID for next message to arrive.
+    ReplicationId maxId;    // Max ID used so far.
 };
 
+
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_QUEUEREPLICATOR_H*/

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h (from r1493713, qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h&r1=1493713&r2=1493771&rev=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h Mon Jun 17 14:19:10 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_OBSERVERS_H
-#define QPID_BROKER_OBSERVERS_H
+#ifndef QPID_HA_IDSETOBSERVER_H
+#define QPID_HA_IDSETOBSERVER_H
 
 /*
  *
@@ -22,48 +22,47 @@
  *
  */
 
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/sys/Mutex.h"
-#include <boost/shared_ptr.hpp>
-#include <vector>
-#include <algorithm>
 
 namespace qpid {
-namespace broker {
+namespace ha {
 
 /**
- * Base class for collections of observers with thread-safe add/remove and traversal.
+ * A QueueObserver that maintains a ReplicationIdSet of the ReplicationIds of
+ * the messages on the queue.
+ *
+ * THREAD SAFE: Note that QueueObserver methods are called under the Queues messageLock.
+ *
  */
-template <class Observer>
-class Observers
+class  QueueSnapshot : public broker::QueueObserver
 {
   public:
-    void add(boost::shared_ptr<Observer> observer) {
+    void enqueued(const broker::Message& m) {
         sys::Mutex::ScopedLock l(lock);
-        observers.push_back(observer);
+        set += m.getReplicationId();
     }
 
-    void remove(boost::shared_ptr<Observer> observer) {
+    void dequeued(const broker::Message& m) {
         sys::Mutex::ScopedLock l(lock);
-        typename List::iterator i = std::find(observers.begin(), observers.end(), observer);
-        observers.erase(i);
+        set -= m.getReplicationId();
     }
 
-  protected:
-    typedef std::vector<boost::shared_ptr<Observer> > List;
+    void acquired(const broker::Message&) {}
 
-    sys::Mutex lock;
-    List observers;
+    void requeued(const broker::Message&) {}
 
-    template <class F> void each(F f) {
-        List copy;
-        {
-            sys::Mutex::ScopedLock l(lock);
-            copy = observers;
-        }
-        std::for_each(copy.begin(), copy.end(), f);
+    ReplicationIdSet snapshot() {
+        sys::Mutex::ScopedLock l(lock);
+        return set;
     }
+
+  private:
+    sys::Mutex lock;
+    ReplicationIdSet set;
 };
 
-}} // namespace qpid::broker
+}} // namespace qpid::ha
 
-#endif  /*!QPID_BROKER_OBSERVERS_H*/
+#endif  /*!QPID_HA_IDSETOBSERVER_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h?rev=1493771&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h Mon Jun 17 14:19:10 2013
@@ -0,0 +1,81 @@
+#ifndef QPID_HA_QUEUESNAPSHOTS_H
+#define QPID_HA_QUEUESNAPSHOTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "QueueSnapshot.h"
+#include "hash.h"
+
+#include "qpid/assert.h"
+#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * ConfigurationObserver that maintains a map of the QueueSnapshot for each queue.
+ * THREAD SAFE.
+ */
+class QueueSnapshots : public broker::ConfigurationObserver
+{
+  public:
+    boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
+        sys::Mutex::ScopedLock l(lock);
+        SnapshotMap::const_iterator i = snapshots.find(q);
+        return i != snapshots.end() ? i->second : boost::shared_ptr<QueueSnapshot>();
+    }
+
+    // ConfigurationObserver overrides.
+    void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+        sys::Mutex::ScopedLock l(lock);
+        boost::shared_ptr<QueueSnapshot> observer(new QueueSnapshot);
+        snapshots[q] = observer;
+        q->addObserver(observer);
+    }
+
+    void queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
+        sys::Mutex::ScopedLock l(lock);
+        SnapshotMap::iterator i = snapshots.find(q);
+        if (i != snapshots.end()) {
+            q->removeObserver(i->second);
+            snapshots.erase(i);
+        }
+    }
+
+  private:
+    typedef qpid::sys::unordered_map<boost::shared_ptr<broker::Queue>,
+                                     boost::shared_ptr<QueueSnapshot>,
+                                     SharedPtrHasher<broker::Queue>
+                                     > SnapshotMap;
+    SnapshotMap snapshots;
+    mutable sys::Mutex lock;
+};
+
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_QUEUESNAPSHOTS_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/ha/README.md
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/README.md?rev=1493771&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/README.md (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/README.md Mon Jun 17 14:19:10 2013
@@ -0,0 +1,80 @@
+
+Overview of HA replication
+==========================
+
+Message Identifiers
+-------------------
+
+Replication IDs are sequence numbers assigned to messages *before* a message is
+enqueued.  Originally the queue position number was used, but that was
+insufficient for two reasons:
+-  We sometimes need to identify messages that are not yet enqueued, for example messages in an open transaction.
+- We don't want to require maintaining identical message sequences on every broker e.g. so transactions can be committed independently by each broker.
+
+We use the IDs to:
+- identify messages to dequeue on a backup.
+- remove extra messages from backup on failover.
+- avoid downloading messages already on backup on failover.
+
+
+On the primary
+--------------
+
+The main classes on the primary are as follows:
+
+`RemoteBackup`: Represents a remote backup broker. Container for per-queue
+information about the broker.
+
+Each (queue,backup) pair has an instance of either or both of the following
+classes:
+
+`QueueGuard`: A queue observer that delays completion of messages as they are
+enqueued and completes messages when they are acknowledged or dequeued.
+
+`RepicatingSubscription`: A queue browser that sends messages to the backup and
+receives acknowledgments. Forwards acknowledgments to the `QueueGuard`
+
+`ReplicatingSubscription` and `QueueGuard` are separate because the guard
+can be created before the subscription.
+
+Events intercepted by HA code:
+
+- enqueue: Message published to queue, completion delayed (QueueGuard)
+- deliver: Message delivered to ReplicatingSubscription and sent to backup.
+- acknowledge: Message acknowledged by backup (ReplicatingSubscription)
+- dequeue: Message removed from queue by a consumer (QueueGuard)
+
+Message states:
+- new: initial state.
+- sent: ReplicatingSubscription has sent message to backup.
+- delayed: QueueGuard has delayed completion.
+- delayed-sent: Both sent and delayed.
+- safe: Replication code is done with the message: it is acknowledged or dequeued.
+
+Events:
+- enqueue: message enqueue on queue
+- deliver: message delivered to ReplicatingSubscription
+- acknowledged: message is acknowledged by backup
+- dequeued: message is dequeued by consumer.
+
+State transition diagram:
+
+    (new)--deliver-->(sent)--acknowledged/dequeued---------------->(safe)
+      |                 L---dequeued- -------------------------------^
+      L-enqueue->(delayed)--dequeued---------------------------------|
+                  |                                                  |
+                  L--deliver->(delayed-sent)--acknowled/dequeued-----|
+
+
+A QueueGuard is set on the queue when a backup subscribes or when a backup is
+promoted. Messages before the _first guarded position_ cannot be delayed
+because they may have already been acknowledged to clients.
+
+A backup sends a set of pre-acknowledged messages when subscribing, messages
+that are already on the backup and therefore safe.
+
+A `ReplicatingSubscription` is _ready_ when all messages are safe or delayed.  We
+know this is the case when all the following conditions hold:
+
+- The `ReplicatingSubscription` has reached the position preceeding the first guarded position AND
+- All messages prior to the first guarded position are safe.

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/README.md
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/README.md
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Mon Jun 17 14:19:10 2013
@@ -35,22 +35,19 @@ using boost::bind;
 
 RemoteBackup::RemoteBackup(
     const BrokerInfo& info, broker::Connection* c
-) : brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false)
+) : brokerInfo(info), replicationTest(NONE), started(false), connection(c), reportedReady(false)
 {
     std::ostringstream oss;
-    oss << "Primary: Remote backup " << info << ": ";
+    oss << "Remote backup at " << info << ": ";
     logPrefix = oss.str();
+    QPID_LOG(debug, logPrefix << "Connected");
 }
 
-void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
-{
-    queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
-    QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues"
-             << (createGuards ? " and guards" : ""));
+RemoteBackup::~RemoteBackup() {
+    // Don't cancel here, cancel must be called explicitly in a locked context
+    // where we know the connection pointer is still good.
 }
 
-RemoteBackup::~RemoteBackup() { cancel(); }
-
 void RemoteBackup::cancel() {
     QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
              << " backup: " << brokerInfo);
@@ -64,7 +61,7 @@ void RemoteBackup::cancel() {
 }
 
 bool RemoteBackup::isReady() {
-    return connection && catchupQueues.empty();
+    return started && connection && catchupQueues.empty();
 }
 
 void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message