qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1518982 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/
Date Fri, 30 Aug 2013 14:43:06 GMT
Author: aconway
Date: Fri Aug 30 14:43:06 2013
New Revision: 1518982

URL: http://svn.apache.org/r1518982
Log:
QPID-4327: HA clean up transaction artifacts at end of TX.

- Backups delete transactions on failover.
- TxReplicator cancel subscriptions when transaction is finished.
- TxReplicator rollback if destroyed prematurely.
- Handle special case of no backups for a tx.
- ha_tests.py: new and modified tests to cover the new functionality.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Aug 30 14:43:06 2013
@@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker {
     typedef boost::function<void (const std::string&)> CleanFn;
 
     UpdateTracker(const std::string& type_, // "queue" or "exchange"
-                  CleanFn f, const ReplicationTest& rt)
-        : type(type_), cleanFn(f), repTest(rt) {}
+                  CleanFn f)
+        : type(type_), cleanFn(f) {}
 
     /** Destructor cleans up remaining initial queues. */
     ~UpdateTracker() {
@@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker {
     }
 
     /** Add an exchange name */
-    void addExchange(Exchange::shared_ptr ex)  {
-        if (repTest.getLevel(*ex))
-            initial.insert(ex->getName());
-    }
+    void addExchange(Exchange::shared_ptr ex)  { initial.insert(ex->getName()); }
 
     /** Add a queue name. */
-    void addQueue(Queue::shared_ptr q) {
-        if (repTest.getLevel(*q))
-            initial.insert(q->getName());
-    }
+    void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); }
 
     /** Received an event for name */
     void event(const std::string& name) {
@@ -281,7 +275,6 @@ class BrokerReplicator::UpdateTracker {
     std::string type;
     Names initial, events;
     CleanFn cleanFn;
-    ReplicationTest repTest;
 };
 
 namespace {
@@ -349,7 +342,8 @@ BrokerReplicator::~BrokerReplicator() { 
 
 namespace {
 void collectQueueReplicators(
-    const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator>
>& collect)
+    const boost::shared_ptr<Exchange>& ex,
+    set<boost::shared_ptr<QueueReplicator> >& collect)
 {
     boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
     if (qr) collect.insert(qr);
@@ -390,16 +384,13 @@ void BrokerReplicator::connected(Bridge&
 
     exchangeTracker.reset(
         new UpdateTracker("exchange",
-                          boost::bind(&BrokerReplicator::deleteExchange, this, _1),
-                          replicationTest));
-    exchanges.eachExchange(
-        boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+                          boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+    exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
 
     queueTracker.reset(
         new UpdateTracker("queue",
-                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
-                          replicationTest));
-    queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+    queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
 
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -428,6 +419,21 @@ void BrokerReplicator::connected(Bridge&
     sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
 }
 
+// Called for each queue in existence when the backup connects to a primary.
+void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
+    if (replicationTest.getLevel(*q)) {
+        QPID_LOG(debug, "Existing queue: " << q->getName());
+        queueTracker->addQueue(q);
+    }
+}
+
+void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex)
{
+    if (replicationTest.getLevel(*ex)) {
+        QPID_LOG(debug, "Existing exchange: " << ex->getName());
+        exchangeTracker->addExchange(ex);
+    }
+}
+
 void BrokerReplicator::route(Deliverable& msg) {
     // We transition from JOINING->CATCHUP on the first message received from the primary.
     // Until now we couldn't be sure if we had a good connection to the primary.
@@ -890,24 +896,36 @@ void BrokerReplicator::autoDeleteCheck(b
     }
 }
 
-// Callback function for accumulating exchange candidates
-namespace {
-	void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c,
const Exchange::shared_ptr& i) {
-		c.push_back(i);
-	}
-}
+typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
+typedef vector<boost::shared_ptr<Queue> > QueueVector;
 
 // Called by ConnectionObserver::disconnected, disconnected from the network side.
 void BrokerReplicator::disconnected() {
     QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
     connection = 0;
-    // Clean up auto-delete queues
-    vector<boost::shared_ptr<Exchange> > collect;
-    // Make a copy so we can work outside the ExchangeRegistry lock
-    exchanges.eachExchange(
-        boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
-    for_each(collect.begin(), collect.end(),
+
+    // Make copys of queues & exchanges so we can work outside the registry lock.
+
+    ExchangeVector exs;
+    exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1));
+    for_each(exs.begin(), exs.end(),
              boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+
+    QueueVector qs;
+    queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1));
+    for_each(qs.begin(), qs.end(),
+             boost::bind(&BrokerReplicator::disconnectedQueue, this, _1));
+}
+
+// Called for queues existing when the backup is disconnected.
+void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) {
+    QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName());
+    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName());
+    if (qr) {
+        qr->disconnect();
+        if (TxReplicator::isTxQueue(q->getName()))
+            deleteQueue(q->getName());
+    }
 }
 
 void BrokerReplicator::setMembership(const Variant::List& brokers) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Aug 30 14:43:06 2013
@@ -102,6 +102,9 @@ class BrokerReplicator : public broker::
     class ConnectionObserver;
 
     void connected(broker::Bridge&, broker::SessionHandler&);
+    void existingQueue(const boost::shared_ptr<broker::Queue>&);
+    void existingExchange(const boost::shared_ptr<broker::Exchange>&);
+    void disconnectedQueue(const boost::shared_ptr<broker::Queue>&);
 
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Aug 30 14:43:06 2013
@@ -214,6 +214,8 @@ void Primary::readyReplica(const Replica
 }
 
 void Primary::addReplica(ReplicatingSubscription& rs) {
+    // Note this is called before the ReplicatingSubscription has been activated
+    // on the queue.
     sys::Mutex::ScopedLock l(lock);
     replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
 }
@@ -231,6 +233,12 @@ void Primary::skip(
 void Primary::removeReplica(const ReplicatingSubscription& rs) {
     sys::Mutex::ScopedLock l(lock);
     replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+
+    TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+    if (i != txMap.end()) {
+        boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
+        if (tx) tx->cancel(rs);
+    }
 }
 
 // NOTE: Called with queue registry lock held.
@@ -387,16 +395,19 @@ void Primary::setCatchupQueues(const Rem
     backup->startCatchup();
 }
 
-void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
     shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
     observer->initialize();
-    tx->setObserver(observer);
+    txMap[observer->getTxQueue()->getName()] = observer;
+    return observer;
+}
+
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+    tx->setObserver(makeTxObserver());
 }
 
 void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
-    shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
-    observer->initialize();
-    dtx->setObserver(observer);
+    dtx->setObserver(makeTxObserver());
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Fri Aug 30 14:43:06 2013
@@ -30,6 +30,7 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/unordered_map.h"
 #include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
 #include <string>
 
@@ -54,6 +55,7 @@ class ReplicatingSubscription;
 class RemoteBackup;
 class QueueGuard;
 class Membership;
+class PrimaryTxObserver;
 
 /**
  * State associated with a primary broker:
@@ -113,6 +115,9 @@ class Primary : public Role
     typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
                                Hasher<UuidQueue> > ReplicaMap;
 
+    // Map of PrimaryTxObservers by tx-queue name
+    typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> >
TxMap;
+
     RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
     void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
 
@@ -121,6 +126,7 @@ class Primary : public Role
     void checkReady(RemoteBackupPtr);
     void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
     void deduplicate();
+    boost::shared_ptr<PrimaryTxObserver> makeTxObserver();
 
     mutable sys::Mutex lock;
     HaBroker& haBroker;
@@ -143,6 +149,7 @@ class Primary : public Role
     boost::shared_ptr<broker::BrokerObserver> brokerObserver;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
     ReplicaMap replicas;
+    TxMap txMap;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Fri Aug 30 14:43:06 2013
@@ -87,12 +87,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
     // Latecomers that have replicated the transaction will be rolled back
     // when the tx-queue is deleted.
     //
-    BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
-    std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()),
+    BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
+    std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
 		   boost::bind(&BrokerInfo::getSystemId, _1));
 
     QPID_LOG(debug, logPrefix << "Started TX " << id);
     QPID_LOG(debug, logPrefix << "Members: " << members);
+    unprepared = unfinished = members;
 
     pair<QueuePtr, bool> result =
         broker.getQueues().declare(
@@ -102,8 +103,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
     txQueue = result.first;
     txQueue->deliver(TxMembersEvent(members).message());
     // Do this last, it will start concurrent callbacks.
-    haBroker.getMembership().addCallback(
-        boost::bind(&PrimaryTxObserver::membership, this, _1));
 }
 
 PrimaryTxObserver::~PrimaryTxObserver() {}
@@ -146,8 +145,7 @@ bool PrimaryTxObserver::prepare() {
     QPID_LOG(debug, logPrefix << "Prepare");
     deduplicate(l);
     txQueue->deliver(TxPrepareEvent().message());
-    while (prepared != members && !failed)
-        lock.wait();
+    while (!unprepared.empty() && !failed) lock.wait();
     return !failed;
 }
 
@@ -155,27 +153,30 @@ void PrimaryTxObserver::commit() {
     sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Commit");
     txQueue->deliver(TxCommitEvent().message());
-    destroy();
+    end(l);
 }
 
 void PrimaryTxObserver::rollback() {
     sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Rollback");
     txQueue->deliver(TxRollbackEvent().message());
-    destroy();
+    end(l);
 }
 
-void PrimaryTxObserver::destroy() {
-    // Destroying the queue will result in destruction of this when
-    // the queues observer references are cleared.
-    haBroker.deleteQueue(txQueue->getName());
+void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
+    // Don't destroy the tx-queue if there are connected subscriptions.
+    if (unfinished.empty()) {
+        // Destroying the queue will result in destruction of this when
+        // the queues observer references are cleared.
+        haBroker.deleteQueue(txQueue->getName());
+    }
 }
 
 void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
     QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
-    prepared.insert(backup);
+    unprepared.erase(backup);
     lock.notify();
 }
 
@@ -183,20 +184,21 @@ void PrimaryTxObserver::txPrepareFailEve
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
     QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup);
-    prepared.insert(backup);
+    unprepared.erase(backup);
     failed = true;
     lock.notify();
 }
 
-void PrimaryTxObserver::membership(const BrokerInfo::Map& update) {
+void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
     sys::Mutex::ScopedLock l(lock);
-    for (UuidSet::iterator i = members.begin(); i != members.end(); ++i)
-        if (!update.count(*i)) { // A broker is down
-            failed = true;
-            lock.notify();
-            return;
-        }
+    types::Uuid backup = rs.getBrokerInfo().getSystemId();
+    if (unprepared.find(backup) != unprepared.end()) {
+        failed = true;            // Canceled before prepared.
+        unprepared.erase(backup); // Consider it prepared-fail
+    }
+    unfinished.erase(backup);
+    lock.notify();
+    end(l);
 }
 
-
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Fri Aug 30 14:43:06 2013
@@ -41,6 +41,7 @@ class Consumer;
 
 namespace ha {
 class HaBroker;
+class ReplicatingSubscription;
 
 /**
  * Observe events in the lifecycle of a transaction.
@@ -74,6 +75,10 @@ class PrimaryTxObserver : public broker:
     void rollback();
 
     types::Uuid getId() const { return id; }
+    QueuePtr getTxQueue() const { return txQueue; }
+
+    // Notify that a backup subscription has been cancelled.
+    void cancel(const ReplicatingSubscription&);
 
   private:
     class Exchange;
@@ -82,11 +87,10 @@ class PrimaryTxObserver : public broker:
 
     void membership(const BrokerInfo::Map&);
     void deduplicate(sys::Mutex::ScopedLock&);
+    void end(sys::Mutex::ScopedLock&);
     void txPrepareOkEvent(const std::string& data);
     void txPrepareFailEvent(const std::string& data);
-    void consumerRemoved(const broker::Consumer&);
-    bool isPrepared(sys::Mutex::ScopedLock&);
-    void destroy();
+
 
     sys::Monitor lock;
     std::string logPrefix;
@@ -96,8 +100,10 @@ class PrimaryTxObserver : public broker:
     QueuePtr txQueue;
     QueueIdsMap enqueues;
     bool failed;
-    UuidSet members;
-    UuidSet prepared;
+
+    UuidSet members;            // All members of transaction.
+    UuidSet unprepared;         // Members that have not yet responded to prepare.
+    UuidSet unfinished;         // Members that have not yet disconnected.
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Aug 30 14:43:06 2013
@@ -169,6 +169,11 @@ void QueueReplicator::activate() {
         boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
 }
 
+void QueueReplicator::disconnect() {
+    Mutex::ScopedLock l(lock);
+    sessionHandler = 0;
+}
+
 QueueReplicator::~QueueReplicator() {}
 
 // Called from Queue::destroyed()
@@ -220,6 +225,14 @@ void QueueReplicator::initializeBridge(B
     QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
 }
 
+void QueueReplicator::cancel(Mutex::ScopedLock&) {
+    if (sessionHandler) {
+        // Cancel the replicating subscription.
+        AMQP_ServerProxy peer(sessionHandler->out);
+        peer.getMessage().cancel(getName());
+    }
+}
+
 namespace {
 template <class T> T decodeContent(Message& m) {
     std::string content = m.getContent();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Aug 30 14:43:06 2013
@@ -68,7 +68,8 @@ class QueueReplicator : public broker::E
 
     ~QueueReplicator();
 
-    void activate();    // Must be called immediately after constructor.
+    void activate();        // Must be called immediately after constructor.
+    void disconnect();      // Called when we are disconnected from the primary.
 
     std::string getType() const;
 
@@ -93,6 +94,7 @@ class QueueReplicator : public broker::E
 
     virtual void deliver(const broker::Message&);
     virtual void destroy();             // Called when the queue is destroyed.
+    void cancel(sys::Mutex::ScopedLock&);
 
     sys::Mutex lock;
     HaBroker& haBroker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Fri Aug 30 14:43:06 2013
@@ -79,6 +79,7 @@ TxReplicator::TxReplicator(
     txBuffer(new broker::TxBuffer),
     store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
     channel(link->nextChannel()),
+    complete(false),
     dequeueState(hb.getBroker().getQueues())
 {
     string id(getTxId(txQueue->getName()));
@@ -200,16 +201,18 @@ void TxReplicator::prepare(const string&
     }
 }
 
-void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
     QPID_LOG(debug, logPrefix << "Commit");
     if (context.get()) store->commit(*context);
     txBuffer->commit();
+    end(l);
 }
 
-void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
     QPID_LOG(debug, logPrefix << "Rollback");
     if (context.get()) store->abort(*context);
     txBuffer->rollback();
+    end(l);
 }
 
 void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
@@ -223,4 +226,15 @@ void TxReplicator::members(const string&
     }
 }
 
+void TxReplicator::end(sys::Mutex::ScopedLock& l) {
+    complete = true;
+    cancel(l);
+}
+
+void TxReplicator::destroy() {
+    QueueReplicator::destroy();
+    sys::Mutex::ScopedLock l(lock);
+    if (!complete) rollback(string(), l);
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h Fri Aug 30 14:43:06 2013
@@ -62,6 +62,9 @@ class TxReplicator : public QueueReplica
 
     std::string getType() const;
 
+    // QueueReplicator overrides
+    void destroy();
+
   protected:
 
     void deliver(const broker::Message&);
@@ -80,6 +83,7 @@ class TxReplicator : public QueueReplica
     void commit(const std::string& data, sys::Mutex::ScopedLock&);
     void rollback(const std::string& data, sys::Mutex::ScopedLock&);
     void members(const std::string& data, sys::Mutex::ScopedLock&);
+    void end(sys::Mutex::ScopedLock&);
 
     std::string logPrefix;
     TxEnqueueEvent enq;         // Enqueue data for next deliver.
@@ -87,6 +91,7 @@ class TxReplicator : public QueueReplica
     broker::MessageStore* store;
     std::auto_ptr<broker::TransactionContext> context;
     framing::ChannelId channel; // Channel to send prepare-complete.
+    bool complete;
 
     // Class to process dequeues and create DeliveryRecords to populate a
     // TxAccept.

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Aug 30 14:43:06 2013
@@ -415,6 +415,10 @@ class BrokerTest(TestCase):
     Provides a well-known working directory for each test.
     """
 
+    def __init__(self, *args, **kwargs):
+        self.longMessage = True # Enable long messages for assert*(..., msg=xxx)
+        TestCase.__init__(self, *args, **kwargs)
+
     # Environment settings.
     qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
     ha_lib = os.getenv("HA_LIB")

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Fri Aug 30 14:43:06 2013
@@ -48,9 +48,24 @@ class QmfAgent(object):
             address, client_properties={"qpid.ha-admin":1}, **kwargs)
         self._agent = BrokerAgent(self._connection)
 
-    def get_queues(self):
+    def queues(self):
         return [q.values['name'] for q in self._agent.getAllQueues()]
 
+    def repsub_queue(self, sub):
+        """If QMF subscription sub is a replicating subscription return
+        the name of the replicated queue, else return None"""
+        session_name = self.getSession(sub.sessionRef).name
+        m = re.search("qpid.ha-q:(.*)\.", session_name)
+        return m and m.group(1)
+
+    def repsub_queues(self):
+        """Return queue names for all replicating subscriptions"""
+        return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()])
+
+    def tx_queues(self):
+        """Return names of all tx-queues"""
+        return [q for q in self.queues() if q.startswith("qpid.ha-tx")]
+
     def __getattr__(self, name):
         a = getattr(self._agent, name)
         return a

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1518982&r1=1518981&r2=1518982&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug 30 14:43:06 2013
@@ -1320,11 +1320,18 @@ class TransactionTests(BrokerTest):
             sb.send(m)
         return tx
 
+    def tx_subscriptions(self, broker):
+        """Return list of queue names for tx subscriptions"""
+        return [q for q in broker.agent().repsub_queues()
+                    if q.startswith("qpid.ha-tx")]
+
     def test_tx_simple_commit(self):
         cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
         tx.sync()
 
+        self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+
         # NOTE: backup does not process transactional dequeues until prepare
         cluster[1].assert_browse_backup("a", ["x","y","z"])
         cluster[1].assert_browse_backup("b", ['0', '1', '2'])
@@ -1333,10 +1340,12 @@ class TransactionTests(BrokerTest):
         tx.commit()
         tx.sync()
 
-        for b in cluster:
-            b.assert_browse_backup("a", [], msg=b)
-            b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+        for b in cluster: self.assert_simple_commit_outcome(b)
+        self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription
cancelled.
 
+    def assert_simple_commit_outcome(self, b):
+        b.assert_browse_backup("a", [], msg=b)
+        b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
         # Check for expected actions on the store
         expect = """<enqueue a x>
 <enqueue a y>
@@ -1347,42 +1356,59 @@ class TransactionTests(BrokerTest):
 <dequeue a z tx=1>
 <commit tx=1>
 """
-        self.assertEqual(expect, open_read(cluster[0].store_log))
-        self.assertEqual(expect, open_read(cluster[1].store_log))
+        self.assertEqual(expect, open_read(b.store_log), msg=b)
+        # Check that transaction artifacts are cleaned up.
+        self.assertEqual([], b.agent().tx_queues(), msg=b)
 
     def test_tx_simple_rollback(self):
         cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
         tx.acknowledge()
         tx.rollback()
-        for b in cluster:
-            b.assert_browse_backup("a", ["x","y","z"], msg=b)
-            b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+        for b in cluster: self.assert_simple_rollback_outcome(b)
+
+    def assert_simple_rollback_outcome(self, b):
+        b.assert_browse_backup("a", ["x","y","z"], msg=b)
+        b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
         # Check for expected actions on the store
         expect = """<enqueue a x>
 <enqueue a y>
 <enqueue a z>
 """
-        self.assertEqual(open_read(cluster[0].store_log), expect)
-        self.assertEqual(open_read(cluster[1].store_log), expect)
+        self.assertEqual(open_read(b.store_log), expect, msg=b)
+        # Check that transaction artifacts are cleaned up.
+        self.assertEqual([], b.agent().tx_queues(), msg=b)
 
     def test_tx_simple_failover(self):
-        cluster = HaCluster(self, 2, test_store=True)
+        cluster = HaCluster(self, 3, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
         tx.acknowledge()
         cluster.bounce(0)       # Should cause roll-back
-        cluster[0].wait_status("ready")
-        for b in cluster:
-            b.assert_browse_backup("a", ["x","y","z"], msg=b)
-            b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+        cluster[0].wait_status("ready") # Restarted.
+        cluster[1].wait_status("active") # Promoted.
+        cluster[2].wait_status("ready")  # Failed over.
+        for b in cluster: self.assert_simple_rollback_outcome(b)
+
+    def test_tx_no_backups(self):
+        """Test the special case of a TX where there are no backups"""
+
+        # Test commit
+        cluster = HaCluster(self, 1, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.acknowledge()
+        tx.commit()
+        tx.sync()
+        self.assert_simple_commit_outcome(cluster[0])
+
+        # Test rollback
+        cluster = HaCluster(self, 1, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.acknowledge()
+        tx.rollback()
+        tx.sync()
+        self.assert_simple_rollback_outcome(cluster[0])
 
-        # Check for expected actions on the store
-        expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
-        self.assertEqual(open_read(cluster[0].store_log), expect)
-        self.assertEqual(open_read(cluster[1].store_log), expect)
 
     def test_tx_backup_fail(self):
         cluster = HaCluster(
@@ -1400,26 +1426,23 @@ class TransactionTests(BrokerTest):
         """Test cluster members joining/leaving cluster.
         Also check that tx-queues are cleaned up at end of transaction."""
 
-        def tx_queues(broker):
-            return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")]
-
         cluster = HaCluster(self, 3)
 
         # Leaving
         tx = cluster[0].connect().session(transactional=True)
         s = tx.sender("q;{create:always}")
         s.send("a", sync=True)
-        self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster])
+        self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
         cluster[1].kill(final=False)
         s.send("b")
         self.assertRaises(ServerError, tx.commit)
-        self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]])
+        self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
 
         # Joining
         tx = cluster[0].connect().session(transactional=True)
         s = tx.sender("q;{create:always}")
         s.send("foo")
-        tx_q = tx_queues(cluster[0])[0]
+        tx_q = cluster[0].agent().tx_queues()[0]
         cluster.restart(1)
         # Verify the new member should not be in the transaction.
         # but should receive the result of the transaction via normal replication.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message